diff --git a/examples/lib/cryptconverter.rb b/examples/lib/cryptconverter.rb index 30fcc6a7..b3c7b77a 100644 --- a/examples/lib/cryptconverter.rb +++ b/examples/lib/cryptconverter.rb @@ -1,7 +1,7 @@ require 'openssl' module Temporal - class CryptConverter < Temporal::Client::Converter::Base + class CryptConverter < Temporal::Connection::Converter::Base CIPHER = 'aes-256-gcm'.freeze GCM_NONCE_SIZE = 12 GCM_TAG_SIZE = 16 diff --git a/examples/spec/helpers.rb b/examples/spec/helpers.rb index 04078f99..c8292d6b 100644 --- a/examples/spec/helpers.rb +++ b/examples/spec/helpers.rb @@ -23,9 +23,9 @@ def wait_for_workflow_completion(workflow_id, run_id) end def fetch_history(workflow_id, run_id, options = {}) - client = Temporal.send(:client) + connection = Temporal.send(:connection) - result = client.get_workflow_execution_history( + result = connection.get_workflow_execution_history( { namespace: Temporal.configuration.namespace, workflow_id: workflow_id, diff --git a/lib/temporal.rb b/lib/temporal.rb index 22f3434b..cc0dc343 100644 --- a/lib/temporal.rb +++ b/lib/temporal.rb @@ -4,7 +4,7 @@ require 'securerandom' require 'temporal/configuration' require 'temporal/execution_options' -require 'temporal/client' +require 'temporal/connection' require 'temporal/activity' require 'temporal/activity/async_token' require 'temporal/workflow' @@ -21,10 +21,10 @@ def start_workflow(workflow, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(workflow, options) + execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) workflow_id = options[:workflow_id] || SecureRandom.uuid - response = client.start_workflow_execution( + response = connection.start_workflow_execution( namespace: execution_options.namespace, workflow_id: workflow_id, workflow_name: execution_options.name, @@ -45,10 +45,10 @@ def schedule_workflow(workflow, cron_schedule, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(workflow, options) + execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) workflow_id = options[:workflow_id] || SecureRandom.uuid - response = client.start_workflow_execution( + response = connection.start_workflow_execution( namespace: execution_options.namespace, workflow_id: workflow_id, workflow_name: execution_options.name, @@ -69,13 +69,13 @@ def schedule_workflow(workflow, cron_schedule, *input, **args) end def register_namespace(name, description = nil) - client.register_namespace(name: name, description: description) + connection.register_namespace(name: name, description: description) end def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) - execution_options = ExecutionOptions.new(workflow) + execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) - client.signal_workflow_execution( + connection.signal_workflow_execution( namespace: execution_options.namespace, # TODO: allow passing namespace instead workflow_id: workflow_id, run_id: run_id, @@ -95,11 +95,11 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) # namespace: if nil, choose the one declared on the Workflow, or the global default def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil) options = namespace ? {namespace: namespace} : {} - execution_options = ExecutionOptions.new(workflow, options) - max_timeout = Temporal::Client::GRPCClient::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL + execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) + max_timeout = Temporal::Connection::GRPC::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL history_response = nil begin - history_response = client.get_workflow_execution_history( + history_response = connection.get_workflow_execution_history( namespace: execution_options.namespace, workflow_id: workflow_id, run_id: run_id, @@ -143,7 +143,7 @@ def reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason workflow_task_id ||= get_last_completed_workflow_task_id(namespace, workflow_id, run_id) raise Error, 'Could not find a completed workflow task event' unless workflow_task_id - response = client.reset_workflow_execution( + response = connection.reset_workflow_execution( namespace: namespace, workflow_id: workflow_id, run_id: run_id, @@ -157,7 +157,7 @@ def reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil) namespace ||= Temporal.configuration.namespace - client.terminate_workflow_execution( + connection.terminate_workflow_execution( namespace: namespace, workflow_id: workflow_id, run_id: run_id, @@ -167,7 +167,7 @@ def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, de end def fetch_workflow_execution_info(namespace, workflow_id, run_id) - response = client.describe_workflow_execution( + response = connection.describe_workflow_execution( namespace: namespace, workflow_id: workflow_id, run_id: run_id @@ -179,7 +179,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id) def complete_activity(async_token, result = nil) details = Activity::AsyncToken.decode(async_token) - client.respond_activity_task_completed_by_id( + connection.respond_activity_task_completed_by_id( namespace: details.namespace, activity_id: details.activity_id, workflow_id: details.workflow_id, @@ -191,7 +191,7 @@ def complete_activity(async_token, result = nil) def fail_activity(async_token, exception) details = Activity::AsyncToken.decode(async_token) - client.respond_activity_task_failed_by_id( + connection.respond_activity_task_failed_by_id( namespace: details.namespace, activity_id: details.activity_id, workflow_id: details.workflow_id, @@ -201,19 +201,20 @@ def fail_activity(async_token, exception) end def configure(&block) - yield configuration + yield config end def configuration - @configuration ||= Configuration.new + warn '[DEPRECATION] This method is now deprecated without a substitution' + config end def logger - configuration.logger + config.logger end def metrics - @metrics ||= Metrics.new(configuration.metrics_adapter) + @metrics ||= Metrics.new(config.metrics_adapter) end class ResultConverter @@ -222,13 +223,17 @@ class ResultConverter private_constant :ResultConverter private + + def config + @config ||= Configuration.new + end - def client - @client ||= Temporal::Client.generate + def connection + @connection ||= Temporal::Connection.generate(config.for_connection) end def get_last_completed_workflow_task_id(namespace, workflow_id, run_id) - history_response = client.get_workflow_execution_history( + history_response = connection.get_workflow_execution_history( namespace: namespace, workflow_id: workflow_id, run_id: run_id diff --git a/lib/temporal/activity/context.rb b/lib/temporal/activity/context.rb index ded09127..2c65c5af 100644 --- a/lib/temporal/activity/context.rb +++ b/lib/temporal/activity/context.rb @@ -7,8 +7,8 @@ module Temporal class Activity class Context - def initialize(client, metadata) - @client = client + def initialize(connection, metadata) + @connection = connection @metadata = metadata @async = false end @@ -32,7 +32,7 @@ def async_token def heartbeat(details = nil) logger.debug("Activity heartbeat", metadata.to_h) - client.record_activity_task_heartbeat(task_token: task_token, details: details) + connection.record_activity_task_heartbeat(task_token: task_token, details: details) end def heartbeat_details @@ -58,7 +58,7 @@ def headers private - attr_reader :client, :metadata + attr_reader :connection, :metadata def task_token metadata.task_token diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index be18e295..c07eba7e 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -1,4 +1,4 @@ -require 'temporal/client' +require 'temporal/connection' require 'temporal/thread_pool' require 'temporal/middleware/chain' require 'temporal/activity/task_processor' @@ -11,10 +11,11 @@ class Poller thread_pool_size: 20 }.freeze - def initialize(namespace, task_queue, activity_lookup, middleware = [], options = {}) + def initialize(namespace, task_queue, activity_lookup, config, middleware = [], options = {}) @namespace = namespace @task_queue = task_queue @activity_lookup = activity_lookup + @config = config @middleware = middleware @shutting_down = false @options = DEFAULT_OPTIONS.merge(options) @@ -31,7 +32,7 @@ def stop_polling end def cancel_pending_requests - client.cancel_polling_request + connection.cancel_polling_request end def wait @@ -41,10 +42,10 @@ def wait private - attr_reader :namespace, :task_queue, :activity_lookup, :middleware, :options, :thread + attr_reader :namespace, :task_queue, :activity_lookup, :config, :middleware, :options, :thread - def client - @client ||= Temporal::Client.generate + def connection + @connection ||= Temporal::Connection.generate(config.for_connection) end def shutting_down? @@ -73,7 +74,7 @@ def poll_loop end def poll_for_task - client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) + connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) rescue StandardError => error Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) @@ -85,7 +86,7 @@ def poll_for_task def process(task) middleware_chain = Middleware::Chain.new(middleware) - TaskProcessor.new(task, namespace, activity_lookup, client, middleware_chain).process + TaskProcessor.new(task, namespace, activity_lookup, middleware_chain, config).process end def thread_pool diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index bd3da635..173a2bc9 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -3,22 +3,23 @@ require 'temporal/errors' require 'temporal/activity/context' require 'temporal/concerns/payloads' -require 'temporal/client/retryer' +require 'temporal/connection/retryer' +require 'temporal/connection' module Temporal class Activity class TaskProcessor include Concerns::Payloads - def initialize(task, namespace, activity_lookup, client, middleware_chain) + def initialize(task, namespace, activity_lookup, middleware_chain, config) @task = task @namespace = namespace @metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace) @task_token = task.task_token @activity_name = task.activity_type.name @activity_class = activity_lookup.find(activity_name) - @client = client @middleware_chain = middleware_chain + @config = config end def process @@ -27,7 +28,7 @@ def process Temporal.logger.debug("Processing Activity task", metadata.to_h) Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name) - context = Activity::Context.new(client, metadata) + context = Activity::Context.new(connection, metadata) if !activity_class raise ActivityNotRegistered, 'Activity is not registered with this worker' @@ -51,7 +52,12 @@ def process private - attr_reader :task, :namespace, :task_token, :activity_name, :activity_class, :client, :middleware_chain, :metadata + attr_reader :task, :namespace, :task_token, :activity_name, :activity_class, + :middleware_chain, :metadata, :config + + def connection + @connection ||= Temporal::Connection.generate(config.for_connection) + end def queue_time_ms scheduled = task.current_attempt_scheduled_time.to_f @@ -64,8 +70,8 @@ def respond_completed(result) log_retry = proc do Temporal.logger.debug("Failed to report activity task completion, retrying", metadata.to_h) end - Temporal::Client::Retryer.with_retries(on_retry: log_retry) do - client.respond_activity_task_completed(task_token: task_token, result: result) + Temporal::Connection::Retryer.with_retries(on_retry: log_retry) do + connection.respond_activity_task_completed(task_token: task_token, result: result) end rescue StandardError => error Temporal.logger.error("Unable to complete Activity", metadata.to_h.merge(error: error.inspect)) @@ -78,8 +84,8 @@ def respond_failed(error) log_retry = proc do Temporal.logger.debug("Failed to report activity task failure, retrying", metadata.to_h) end - Temporal::Client::Retryer.with_retries(on_retry: log_retry) do - client.respond_activity_task_failed(task_token: task_token, exception: error) + Temporal::Connection::Retryer.with_retries(on_retry: log_retry) do + connection.respond_activity_task_failed(task_token: task_token, exception: error) end rescue StandardError => error Temporal.logger.error("Unable to fail Activity task", metadata.to_h.merge(error: error.inspect)) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb deleted file mode 100644 index 2c9952ea..00000000 --- a/lib/temporal/client.rb +++ /dev/null @@ -1,21 +0,0 @@ -require 'temporal/client/grpc_client' - -module Temporal - module Client - CLIENT_TYPES_MAP = { - grpc: Temporal::Client::GRPCClient - }.freeze - - def self.generate - client_class = CLIENT_TYPES_MAP[Temporal.configuration.client_type] - host = Temporal.configuration.host - port = Temporal.configuration.port - - hostname = `hostname` - thread_id = Thread.current.object_id - identity = "#{thread_id}@#{hostname}" - - client_class.new(host, port, identity) - end - end -end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index af812a41..828c561f 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -1,15 +1,18 @@ require 'temporal/logger' require 'temporal/metrics_adapters/null' -require 'temporal/client/converter/payload/nil' -require 'temporal/client/converter/payload/bytes' -require 'temporal/client/converter/payload/json' -require 'temporal/client/converter/composite' +require 'temporal/connection/converter/payload/nil' +require 'temporal/connection/converter/payload/bytes' +require 'temporal/connection/converter/payload/json' +require 'temporal/connection/converter/composite' module Temporal class Configuration + Connection = Struct.new(:type, :host, :port, keyword_init: true) + Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true) + attr_reader :timeouts, :error_handlers attr_writer :converter - attr_accessor :client_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers + attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers # We want an infinite execution timeout for cron schedules and other perpetual workflows. # We choose an 10-year execution timeout because that's the maximum the cassandra DB supports, @@ -28,16 +31,16 @@ class Configuration DEFAULT_HEADERS = {}.freeze DEFAULT_NAMESPACE = 'default-namespace'.freeze DEFAULT_TASK_QUEUE = 'default-task-queue'.freeze - DEFAULT_CONVERTER = Temporal::Client::Converter::Composite.new( + DEFAULT_CONVERTER = Temporal::Connection::Converter::Composite.new( payload_converters: [ - Temporal::Client::Converter::Payload::Nil.new, - Temporal::Client::Converter::Payload::Bytes.new, - Temporal::Client::Converter::Payload::JSON.new, + Temporal::Connection::Converter::Payload::Nil.new, + Temporal::Connection::Converter::Payload::Bytes.new, + Temporal::Connection::Converter::Payload::JSON.new, ] ).freeze def initialize - @client_type = :grpc + @connection_type = :grpc @logger = Temporal::Logger.new(STDOUT, progname: 'temporal_client') @metrics_adapter = MetricsAdapters::Null.new @timeouts = DEFAULT_TIMEOUTS @@ -67,5 +70,22 @@ def timeouts=(new_timeouts) def converter @converter end + + def for_connection + Connection.new( + type: connection_type, + host: host, + port: port + ).freeze + end + + def default_execution_options + Execution.new( + namespace: namespace, + task_queue: task_list, + timeouts: timeouts, + headers: headers + ).freeze + end end end diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb new file mode 100644 index 00000000..b499ca73 --- /dev/null +++ b/lib/temporal/connection.rb @@ -0,0 +1,21 @@ +require 'temporal/connection/grpc' + +module Temporal + module Connection + CLIENT_TYPES_MAP = { + grpc: Temporal::Connection::GRPC + }.freeze + + def self.generate(configuration) + connection_class = CLIENT_TYPES_MAP[configuration.type] + host = configuration.host + port = configuration.port + + hostname = `hostname` + thread_id = Thread.current.object_id + identity = "#{thread_id}@#{hostname}" + + connection_class.new(host, port, identity) + end + end +end diff --git a/lib/temporal/client/converter/base.rb b/lib/temporal/connection/converter/base.rb similarity index 97% rename from lib/temporal/client/converter/base.rb rename to lib/temporal/connection/converter/base.rb index b6c7f813..93b09b2b 100644 --- a/lib/temporal/client/converter/base.rb +++ b/lib/temporal/connection/converter/base.rb @@ -1,5 +1,5 @@ module Temporal - module Client + module Connection module Converter class Base def initialize(payload_converter:) diff --git a/lib/temporal/client/converter/composite.rb b/lib/temporal/connection/converter/composite.rb similarity index 94% rename from lib/temporal/client/converter/composite.rb rename to lib/temporal/connection/converter/composite.rb index 15289f39..2b0d2f11 100644 --- a/lib/temporal/client/converter/composite.rb +++ b/lib/temporal/connection/converter/composite.rb @@ -1,7 +1,7 @@ -require 'temporal/client/converter/base' +require 'temporal/connection/converter/base' module Temporal - module Client + module Connection module Converter class Composite < Base class ConverterNotFound < RuntimeError; end diff --git a/lib/temporal/client/converter/payload/bytes.rb b/lib/temporal/connection/converter/payload/bytes.rb similarity index 96% rename from lib/temporal/client/converter/payload/bytes.rb rename to lib/temporal/connection/converter/payload/bytes.rb index d9e85961..16b157c8 100644 --- a/lib/temporal/client/converter/payload/bytes.rb +++ b/lib/temporal/connection/converter/payload/bytes.rb @@ -1,7 +1,7 @@ require 'temporal/json' module Temporal - module Client + module Connection module Converter module Payload class Bytes diff --git a/lib/temporal/client/converter/payload/json.rb b/lib/temporal/connection/converter/payload/json.rb similarity index 96% rename from lib/temporal/client/converter/payload/json.rb rename to lib/temporal/connection/converter/payload/json.rb index 133cd0bf..0e1665e0 100644 --- a/lib/temporal/client/converter/payload/json.rb +++ b/lib/temporal/connection/converter/payload/json.rb @@ -1,7 +1,7 @@ require 'temporal/json' module Temporal - module Client + module Connection module Converter module Payload class JSON diff --git a/lib/temporal/client/converter/payload/nil.rb b/lib/temporal/connection/converter/payload/nil.rb similarity index 96% rename from lib/temporal/client/converter/payload/nil.rb rename to lib/temporal/connection/converter/payload/nil.rb index 6b25f761..7337520f 100644 --- a/lib/temporal/client/converter/payload/nil.rb +++ b/lib/temporal/connection/converter/payload/nil.rb @@ -1,5 +1,5 @@ module Temporal - module Client + module Connection module Converter module Payload class Nil diff --git a/lib/temporal/client/errors.rb b/lib/temporal/connection/errors.rb similarity index 58% rename from lib/temporal/client/errors.rb rename to lib/temporal/connection/errors.rb index 31efb483..f00d883d 100644 --- a/lib/temporal/client/errors.rb +++ b/lib/temporal/connection/errors.rb @@ -1,8 +1,8 @@ module Temporal - module Client + module Connection class Error < StandardError; end - # incorrect arguments passed to the client + # incorrect arguments passed to the connection class ArgumentError < Error; end end end diff --git a/lib/temporal/client/grpc_client.rb b/lib/temporal/connection/grpc.rb similarity index 98% rename from lib/temporal/client/grpc_client.rb rename to lib/temporal/connection/grpc.rb index db8075f8..848d2de0 100644 --- a/lib/temporal/client/grpc_client.rb +++ b/lib/temporal/connection/grpc.rb @@ -1,15 +1,15 @@ require 'grpc' require 'google/protobuf/well_known_types' require 'securerandom' -require 'temporal/client/errors' -require 'temporal/client/serializer' -require 'temporal/client/serializer/failure' +require 'temporal/connection/errors' +require 'temporal/connection/serializer' +require 'temporal/connection/serializer/failure' require 'gen/temporal/api/workflowservice/v1/service_services_pb' require 'temporal/concerns/payloads' module Temporal - module Client - class GRPCClient + module Connection + class GRPC include Concerns::Payloads WORKFLOW_ID_REUSE_POLICY = { @@ -41,7 +41,7 @@ def register_namespace(name:, description: nil, global: false, retention_period: ) ) client.register_namespace(request) - rescue GRPC::AlreadyExists => e + rescue ::GRPC::AlreadyExists => e raise Temporal::NamespaceAlreadyExistsFailure, e.details end @@ -112,7 +112,7 @@ def start_workflow_execution( end client.start_workflow_execution(request) - rescue GRPC::AlreadyExists => e + rescue ::GRPC::AlreadyExists => e # Feel like there should be cleaner way to do this... run_id = e.details[/RunId: ([a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+)/, 1] raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(e.details, run_id) diff --git a/lib/temporal/client/retryer.rb b/lib/temporal/connection/retryer.rb similarity index 98% rename from lib/temporal/client/retryer.rb rename to lib/temporal/connection/retryer.rb index fe925093..d70ba3b4 100644 --- a/lib/temporal/client/retryer.rb +++ b/lib/temporal/connection/retryer.rb @@ -1,5 +1,5 @@ module Temporal - module Client + module Connection module Retryer INITIAL_INTERVAL_S = 0.2 MAX_INTERVAL_S = 6.0 diff --git a/lib/temporal/client/serializer.rb b/lib/temporal/connection/serializer.rb similarity index 63% rename from lib/temporal/client/serializer.rb rename to lib/temporal/connection/serializer.rb index a8b19af2..98ce71b4 100644 --- a/lib/temporal/client/serializer.rb +++ b/lib/temporal/connection/serializer.rb @@ -1,16 +1,16 @@ require 'temporal/workflow/command' -require 'temporal/client/serializer/schedule_activity' -require 'temporal/client/serializer/start_child_workflow' -require 'temporal/client/serializer/request_activity_cancellation' -require 'temporal/client/serializer/record_marker' -require 'temporal/client/serializer/start_timer' -require 'temporal/client/serializer/cancel_timer' -require 'temporal/client/serializer/complete_workflow' -require 'temporal/client/serializer/continue_as_new' -require 'temporal/client/serializer/fail_workflow' +require 'temporal/connection/serializer/schedule_activity' +require 'temporal/connection/serializer/start_child_workflow' +require 'temporal/connection/serializer/request_activity_cancellation' +require 'temporal/connection/serializer/record_marker' +require 'temporal/connection/serializer/start_timer' +require 'temporal/connection/serializer/cancel_timer' +require 'temporal/connection/serializer/complete_workflow' +require 'temporal/connection/serializer/continue_as_new' +require 'temporal/connection/serializer/fail_workflow' module Temporal - module Client + module Connection module Serializer SERIALIZERS_MAP = { Workflow::Command::ScheduleActivity => Serializer::ScheduleActivity, diff --git a/lib/temporal/client/serializer/base.rb b/lib/temporal/connection/serializer/base.rb similarity index 95% rename from lib/temporal/client/serializer/base.rb rename to lib/temporal/connection/serializer/base.rb index e75be098..9fcd49c5 100644 --- a/lib/temporal/client/serializer/base.rb +++ b/lib/temporal/connection/serializer/base.rb @@ -3,7 +3,7 @@ require 'gen/temporal/api/command/v1/message_pb' module Temporal - module Client + module Connection module Serializer class Base def initialize(object) diff --git a/lib/temporal/client/serializer/cancel_timer.rb b/lib/temporal/connection/serializer/cancel_timer.rb similarity index 87% rename from lib/temporal/client/serializer/cancel_timer.rb rename to lib/temporal/connection/serializer/cancel_timer.rb index 4d18b6ec..a51eceb7 100644 --- a/lib/temporal/client/serializer/cancel_timer.rb +++ b/lib/temporal/connection/serializer/cancel_timer.rb @@ -1,7 +1,7 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' module Temporal - module Client + module Connection module Serializer class CancelTimer < Base def to_proto diff --git a/lib/temporal/client/serializer/complete_workflow.rb b/lib/temporal/connection/serializer/complete_workflow.rb similarity index 90% rename from lib/temporal/client/serializer/complete_workflow.rb rename to lib/temporal/connection/serializer/complete_workflow.rb index 3af3424e..f228dbee 100644 --- a/lib/temporal/client/serializer/complete_workflow.rb +++ b/lib/temporal/connection/serializer/complete_workflow.rb @@ -1,8 +1,8 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' require 'temporal/concerns/payloads' module Temporal - module Client + module Connection module Serializer class CompleteWorkflow < Base include Concerns::Payloads diff --git a/lib/temporal/client/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb similarity index 83% rename from lib/temporal/client/serializer/continue_as_new.rb rename to lib/temporal/connection/serializer/continue_as_new.rb index 4693031b..2d1e588c 100644 --- a/lib/temporal/client/serializer/continue_as_new.rb +++ b/lib/temporal/connection/serializer/continue_as_new.rb @@ -1,9 +1,9 @@ -require 'temporal/client/serializer/base' -require 'temporal/client/serializer/retry_policy' +require 'temporal/connection/serializer/base' +require 'temporal/connection/serializer/retry_policy' require 'temporal/concerns/payloads' module Temporal - module Client + module Connection module Serializer class ContinueAsNew < Base include Concerns::Payloads @@ -18,7 +18,7 @@ def to_proto input: to_payloads(object.input), workflow_run_timeout: object.timeouts[:execution], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Client::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, header: serialize_headers(object.headers) ) ) diff --git a/lib/temporal/client/serializer/fail_workflow.rb b/lib/temporal/connection/serializer/fail_workflow.rb similarity index 89% rename from lib/temporal/client/serializer/fail_workflow.rb rename to lib/temporal/connection/serializer/fail_workflow.rb index 26994b41..0cc79725 100644 --- a/lib/temporal/client/serializer/fail_workflow.rb +++ b/lib/temporal/connection/serializer/fail_workflow.rb @@ -1,8 +1,8 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' require 'temporal/json' module Temporal - module Client + module Connection module Serializer class FailWorkflow < Base def to_proto diff --git a/lib/temporal/client/serializer/failure.rb b/lib/temporal/connection/serializer/failure.rb similarity index 91% rename from lib/temporal/client/serializer/failure.rb rename to lib/temporal/connection/serializer/failure.rb index ff9e389f..15dfc555 100644 --- a/lib/temporal/client/serializer/failure.rb +++ b/lib/temporal/connection/serializer/failure.rb @@ -1,8 +1,8 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' require 'temporal/concerns/payloads' module Temporal - module Client + module Connection module Serializer class Failure < Base include Concerns::Payloads diff --git a/lib/temporal/client/serializer/record_marker.rb b/lib/temporal/connection/serializer/record_marker.rb similarity index 90% rename from lib/temporal/client/serializer/record_marker.rb rename to lib/temporal/connection/serializer/record_marker.rb index bd205af6..133d79dc 100644 --- a/lib/temporal/client/serializer/record_marker.rb +++ b/lib/temporal/connection/serializer/record_marker.rb @@ -1,8 +1,8 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' require 'temporal/concerns/payloads' module Temporal - module Client + module Connection module Serializer class RecordMarker < Base include Concerns::Payloads diff --git a/lib/temporal/client/serializer/request_activity_cancellation.rb b/lib/temporal/connection/serializer/request_activity_cancellation.rb similarity index 89% rename from lib/temporal/client/serializer/request_activity_cancellation.rb rename to lib/temporal/connection/serializer/request_activity_cancellation.rb index 4196b73a..2cf51a65 100644 --- a/lib/temporal/client/serializer/request_activity_cancellation.rb +++ b/lib/temporal/connection/serializer/request_activity_cancellation.rb @@ -1,7 +1,7 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' module Temporal - module Client + module Connection module Serializer class RequestActivityCancellation < Base def to_proto diff --git a/lib/temporal/client/serializer/retry_policy.rb b/lib/temporal/connection/serializer/retry_policy.rb similarity index 90% rename from lib/temporal/client/serializer/retry_policy.rb rename to lib/temporal/connection/serializer/retry_policy.rb index 3370f1cf..58d42ab1 100644 --- a/lib/temporal/client/serializer/retry_policy.rb +++ b/lib/temporal/connection/serializer/retry_policy.rb @@ -1,7 +1,7 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' module Temporal - module Client + module Connection module Serializer class RetryPolicy < Base def to_proto diff --git a/lib/temporal/client/serializer/schedule_activity.rb b/lib/temporal/connection/serializer/schedule_activity.rb similarity index 85% rename from lib/temporal/client/serializer/schedule_activity.rb rename to lib/temporal/connection/serializer/schedule_activity.rb index 9c35e614..93d3a207 100644 --- a/lib/temporal/client/serializer/schedule_activity.rb +++ b/lib/temporal/connection/serializer/schedule_activity.rb @@ -1,9 +1,9 @@ -require 'temporal/client/serializer/base' -require 'temporal/client/serializer/retry_policy' +require 'temporal/connection/serializer/base' +require 'temporal/connection/serializer/retry_policy' require 'temporal/concerns/payloads' module Temporal - module Client + module Connection module Serializer class ScheduleActivity < Base include Concerns::Payloads @@ -22,7 +22,7 @@ def to_proto schedule_to_start_timeout: object.timeouts[:schedule_to_start], start_to_close_timeout: object.timeouts[:start_to_close], heartbeat_timeout: object.timeouts[:heartbeat], - retry_policy: Temporal::Client::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, header: serialize_headers(object.headers) ) ) diff --git a/lib/temporal/client/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb similarity index 85% rename from lib/temporal/client/serializer/start_child_workflow.rb rename to lib/temporal/connection/serializer/start_child_workflow.rb index 373ec2bd..55312e50 100644 --- a/lib/temporal/client/serializer/start_child_workflow.rb +++ b/lib/temporal/connection/serializer/start_child_workflow.rb @@ -1,9 +1,9 @@ -require 'temporal/client/serializer/base' -require 'temporal/client/serializer/retry_policy' +require 'temporal/connection/serializer/base' +require 'temporal/connection/serializer/retry_policy' require 'temporal/concerns/payloads' module Temporal - module Client + module Connection module Serializer class StartChildWorkflow < Base include Concerns::Payloads @@ -21,7 +21,7 @@ def to_proto workflow_execution_timeout: object.timeouts[:execution], workflow_run_timeout: object.timeouts[:run], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Client::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, header: serialize_headers(object.headers) ) ) diff --git a/lib/temporal/client/serializer/start_timer.rb b/lib/temporal/connection/serializer/start_timer.rb similarity index 88% rename from lib/temporal/client/serializer/start_timer.rb rename to lib/temporal/connection/serializer/start_timer.rb index acbc60d6..9ec313ea 100644 --- a/lib/temporal/client/serializer/start_timer.rb +++ b/lib/temporal/connection/serializer/start_timer.rb @@ -1,7 +1,7 @@ -require 'temporal/client/serializer/base' +require 'temporal/connection/serializer/base' module Temporal - module Client + module Connection module Serializer class StartTimer < Base def to_proto diff --git a/lib/temporal/execution_options.rb b/lib/temporal/execution_options.rb index 3c8b679e..9fc4f956 100644 --- a/lib/temporal/execution_options.rb +++ b/lib/temporal/execution_options.rb @@ -4,7 +4,7 @@ module Temporal class ExecutionOptions attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers - def initialize(object, options = {}) + def initialize(object, options, defaults = nil) @name = options[:name] || object.to_s @namespace = options[:namespace] @task_queue = options[:task_queue] || options[:task_list] @@ -20,10 +20,12 @@ def initialize(object, options = {}) @headers = object.headers.merge(@headers) if object.headers end - @namespace ||= Temporal.configuration.namespace - @task_queue ||= Temporal.configuration.task_queue - @timeouts = Temporal.configuration.timeouts.merge(@timeouts) - @headers = Temporal.configuration.headers.merge(@headers) + if defaults + @namespace ||= defaults.namespace + @task_queue ||= defaults.task_queue + @timeouts = defaults.timeouts.merge(@timeouts) + @headers = defaults.headers.merge(@headers) + end freeze end diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index 3283a0f5..3642a7d3 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -11,7 +11,7 @@ module Testing class LocalWorkflowContext attr_reader :metadata - def initialize(execution, workflow_id, run_id, disabled_releases, metadata) + def initialize(execution, workflow_id, run_id, disabled_releases, metadata, config = Temporal.configuration) @last_event_id = 0 @execution = execution @run_id = run_id @@ -19,6 +19,7 @@ def initialize(execution, workflow_id, run_id, disabled_releases, metadata) @disabled_releases = disabled_releases @metadata = metadata @completed = false + @config = config end def completed? @@ -47,7 +48,7 @@ def execute_activity(activity_class, *input, **args) target = Workflow::History::EventTarget.new(event_id, Workflow::History::EventTarget::ACTIVITY_TYPE) future = Workflow::Future.new(target, self, cancelation_id: activity_id) - execution_options = ExecutionOptions.new(activity_class, options) + execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options) metadata = Metadata::Activity.new( namespace: execution_options.namespace, id: activity_id, @@ -94,7 +95,7 @@ def execute_local_activity(activity_class, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(activity_class, options) + execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options) activity_id = options[:activity_id] || SecureRandom.uuid metadata = Metadata::Activity.new( namespace: execution_options.namespace, @@ -124,7 +125,7 @@ def execute_workflow!(workflow_class, *input, **args) execution = WorkflowExecution.new workflow_id = SecureRandom.uuid run_id = SecureRandom.uuid - execution_options = ExecutionOptions.new(workflow_class, options) + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) context = Temporal::Testing::LocalWorkflowContext.new( execution, workflow_id, run_id, workflow_class.disabled_releases, execution_options.headers ) @@ -187,7 +188,7 @@ def cancel(target, cancelation_id) private - attr_reader :execution, :run_id, :workflow_id, :disabled_releases + attr_reader :execution, :run_id, :workflow_id, :disabled_releases, :config def completed! @completed = true diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index b1e305e7..81881b67 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -1,4 +1,3 @@ -require 'temporal/client' require 'temporal/workflow/poller' require 'temporal/activity/poller' require 'temporal/execution_options' @@ -10,9 +9,11 @@ class Worker # activity_thread_pool_size: number of threads that the poller can use to run activities. # can be set to 1 if you want no paralellism in your activities, at the cost of throughput. def initialize( + config = Temporal.configuration, activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size] ) + @config = config @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @pollers = [] @@ -28,14 +29,14 @@ def initialize( end def register_workflow(workflow_class, options = {}) - execution_options = ExecutionOptions.new(workflow_class, options) + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) key = [execution_options.namespace, execution_options.task_queue] @workflows[key].add(execution_options.name, workflow_class) end def register_activity(activity_class, options = {}) - execution_options = ExecutionOptions.new(activity_class, options) + execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options) key = [execution_options.namespace, execution_options.task_queue] @activities[key].add(execution_options.name, activity_class) @@ -81,7 +82,7 @@ def stop private - attr_reader :activity_poller_options, :workflow_poller_options, + attr_reader :config, :activity_poller_options, :workflow_poller_options, :activities, :workflows, :pollers, :workflow_task_middleware, :activity_middleware @@ -90,11 +91,11 @@ def shutting_down? end def workflow_poller_for(namespace, task_queue, lookup) - Workflow::Poller.new(namespace, task_queue, lookup.freeze, workflow_task_middleware, workflow_poller_options) + Workflow::Poller.new(namespace, task_queue, lookup.freeze, config, workflow_task_middleware, workflow_poller_options) end def activity_poller_for(namespace, task_queue, lookup) - Activity::Poller.new(namespace, task_queue, lookup.freeze, activity_middleware, activity_poller_options) + Activity::Poller.new(namespace, task_queue, lookup.freeze, config, activity_middleware, activity_poller_options) end def trap_signals diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index f7dd8fd5..d4541840 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -17,12 +17,13 @@ class Workflow class Context attr_reader :metadata - def initialize(state_manager, dispatcher, workflow_class, metadata) + def initialize(state_manager, dispatcher, workflow_class, metadata, config) @state_manager = state_manager @dispatcher = dispatcher @workflow_class = workflow_class @metadata = metadata @completed = false + @config = config end def completed? @@ -47,7 +48,7 @@ def execute_activity(activity_class, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(activity_class, options) + execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options) command = Command::ScheduleActivity.new( activity_id: options[:activity_id], @@ -100,7 +101,7 @@ def execute_workflow(workflow_class, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(workflow_class, options) + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) command = Command::StartChildWorkflow.new( workflow_id: options[:workflow_id] || SecureRandom.uuid, @@ -194,7 +195,7 @@ def continue_as_new(*input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(workflow_class, options) + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) command = Command::ContinueAsNew.new( workflow_type: execution_options.name, @@ -257,7 +258,7 @@ def cancel(target, cancelation_id) private - attr_reader :state_manager, :dispatcher, :workflow_class + attr_reader :state_manager, :dispatcher, :workflow_class, :config def completed! @completed = true diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index a976af75..c81703cb 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -8,11 +8,12 @@ module Temporal class Workflow class Executor - def initialize(workflow_class, history) + def initialize(workflow_class, history, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new @state_manager = StateManager.new(dispatcher) @history = history + @config = config end def run @@ -31,10 +32,10 @@ def run private - attr_reader :workflow_class, :dispatcher, :state_manager, :history + attr_reader :workflow_class, :dispatcher, :state_manager, :history, :config def execute_workflow(input, metadata) - context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata) + context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) Fiber.new do workflow_class.execute_in_context(context, input) diff --git a/lib/temporal/workflow/poller.rb b/lib/temporal/workflow/poller.rb index ac5fb98e..f312d0f9 100644 --- a/lib/temporal/workflow/poller.rb +++ b/lib/temporal/workflow/poller.rb @@ -1,4 +1,4 @@ -require 'temporal/client' +require 'temporal/connection' require 'temporal/thread_pool' require 'temporal/middleware/chain' require 'temporal/workflow/task_processor' @@ -11,10 +11,11 @@ class Poller thread_pool_size: 10 }.freeze - def initialize(namespace, task_queue, workflow_lookup, middleware = [], options = {}) + def initialize(namespace, task_queue, workflow_lookup, config, middleware = [], options = {}) @namespace = namespace @task_queue = task_queue @workflow_lookup = workflow_lookup + @config = config @middleware = middleware @shutting_down = false @options = DEFAULT_OPTIONS.merge(options) @@ -31,7 +32,7 @@ def stop_polling end def cancel_pending_requests - client.cancel_polling_request + connection.cancel_polling_request end def wait @@ -41,10 +42,10 @@ def wait private - attr_reader :namespace, :task_queue, :workflow_lookup, :middleware, :options, :thread + attr_reader :namespace, :task_queue, :connection, :workflow_lookup, :config, :middleware, :options, :thread - def client - @client ||= Temporal::Client.generate + def connection + @connection ||= Temporal::Connection.generate(config.for_connection) end def shutting_down? @@ -73,7 +74,7 @@ def poll_loop end def poll_for_task - client.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue) + connection.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue) rescue StandardError => error Temporal.logger.error("Unable to poll Workflow task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) Temporal::ErrorHandler.handle(error) @@ -84,7 +85,7 @@ def poll_for_task def process(task) middleware_chain = Middleware::Chain.new(middleware) - TaskProcessor.new(task, namespace, workflow_lookup, client, middleware_chain).process + TaskProcessor.new(task, namespace, workflow_lookup, middleware_chain, config).process end def thread_pool diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index df817c7f..fbeb6399 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -9,15 +9,15 @@ class Workflow class TaskProcessor MAX_FAILED_ATTEMPTS = 1 - def initialize(task, namespace, workflow_lookup, client, middleware_chain) + def initialize(task, namespace, workflow_lookup, middleware_chain, config) @task = task @namespace = namespace @metadata = Metadata.generate(Metadata::WORKFLOW_TASK_TYPE, task, namespace) @task_token = task.task_token @workflow_name = task.workflow_type.name @workflow_class = workflow_lookup.find(workflow_name) - @client = client @middleware_chain = middleware_chain + @config = config end def process @@ -32,7 +32,7 @@ def process history = fetch_full_history # TODO: For sticky workflows we need to cache the Executor instance - executor = Workflow::Executor.new(workflow_class, history) + executor = Workflow::Executor.new(workflow_class, history, config) commands = middleware_chain.invoke(metadata) do executor.run @@ -51,7 +51,12 @@ def process private - attr_reader :task, :namespace, :task_token, :workflow_name, :workflow_class, :client, :middleware_chain, :metadata + attr_reader :task, :namespace, :task_token, :workflow_name, :workflow_class, + :middleware_chain, :metadata, :config + + def connection + @connection ||= Temporal::Connection.generate(config.for_connection) + end def queue_time_ms scheduled = task.scheduled_time.to_f @@ -64,7 +69,7 @@ def fetch_full_history next_page_token = task.next_page_token while !next_page_token.empty? do - response = client.get_workflow_execution_history( + response = connection.get_workflow_execution_history( namespace: namespace, workflow_id: task.workflow_execution.workflow_id, run_id: task.workflow_execution.run_id, @@ -81,7 +86,7 @@ def fetch_full_history def complete_task(commands) Temporal.logger.info("Workflow task completed", metadata.to_h) - client.respond_workflow_task_completed(task_token: task_token, commands: commands) + connection.respond_workflow_task_completed(task_token: task_token, commands: commands) end def fail_task(error) @@ -93,7 +98,7 @@ def fail_task(error) # yet exponentially backoff on retries. return if task.attempt > MAX_FAILED_ATTEMPTS - client.respond_workflow_task_failed( + connection.respond_workflow_task_failed( task_token: task_token, cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, exception: error diff --git a/spec/unit/lib/temporal/activity/poller_spec.rb b/spec/unit/lib/temporal/activity/poller_spec.rb index 62c6489e..04de13ed 100644 --- a/spec/unit/lib/temporal/activity/poller_spec.rb +++ b/spec/unit/lib/temporal/activity/poller_spec.rb @@ -1,37 +1,39 @@ require 'temporal/activity/poller' require 'temporal/middleware/entry' +require 'temporal/configuration' describe Temporal::Activity::Poller do - let(:client) { instance_double('Temporal::Client::GRPCClient', cancel_polling_request: nil) } + let(:connection) { instance_double('Temporal::Connection::GRPC', cancel_polling_request: nil) } let(:namespace) { 'test-namespace' } let(:task_queue) { 'test-task-queue' } let(:lookup) { instance_double('Temporal::ExecutableLookup') } let(:thread_pool) do instance_double(Temporal::ThreadPool, wait_for_available_threads: nil, shutdown: nil) end + let(:config) { Temporal::Configuration.new } let(:middleware_chain) { instance_double(Temporal::Middleware::Chain) } let(:middleware) { [] } - subject { described_class.new(namespace, task_queue, lookup, middleware) } + subject { described_class.new(namespace, task_queue, lookup, config, middleware) } before do - allow(Temporal::Client).to receive(:generate).and_return(client) + allow(Temporal::Connection).to receive(:generate).and_return(connection) allow(Temporal::ThreadPool).to receive(:new).and_return(thread_pool) allow(Temporal::Middleware::Chain).to receive(:new).and_return(middleware_chain) allow(Temporal.metrics).to receive(:timing) end describe '#start' do - it 'polls for activity tasks' do + it 'measures time between polls' do allow(subject).to receive(:shutting_down?).and_return(false, false, true) - allow(client).to receive(:poll_activity_task_queue).and_return(nil) + allow(connection).to receive(:poll_activity_task_queue).and_return(nil) subject.start # stop poller before inspecting subject.stop_polling; subject.wait - expect(client) + expect(connection) .to have_received(:poll_activity_task_queue) .with(namespace: namespace, task_queue: task_queue) .twice @@ -39,7 +41,7 @@ it 'reports time since last poll' do allow(subject).to receive(:shutting_down?).and_return(false, false, true) - allow(client).to receive(:poll_activity_task_queue).and_return(nil) + allow(connection).to receive(:poll_activity_task_queue).and_return(nil) subject.start @@ -63,7 +65,7 @@ before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_activity_task_queue).and_return(task) + allow(connection).to receive(:poll_activity_task_queue).and_return(task) allow(Temporal::Activity::TaskProcessor).to receive(:new).and_return(task_processor) allow(thread_pool).to receive(:schedule).and_yield end @@ -85,7 +87,7 @@ expect(Temporal::Activity::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, client, middleware_chain) + .with(task, namespace, lookup, middleware_chain, config) expect(task_processor).to have_received(:process) end @@ -108,15 +110,15 @@ def call(_); end expect(Temporal::Middleware::Chain).to have_received(:new).with(middleware) expect(Temporal::Activity::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, client, middleware_chain) + .with(task, namespace, lookup, middleware_chain, config) end end end - context 'when client is unable to poll' do + context 'when connection is unable to poll' do before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_activity_task_queue).and_raise(StandardError) + allow(connection).to receive(:poll_activity_task_queue).and_raise(StandardError) end it 'logs' do @@ -138,11 +140,11 @@ def call(_); end before { subject.start } after { subject.wait } - it 'tells client to cancel polling requests' do + it 'tells connection to cancel polling requests' do subject.stop_polling subject.cancel_pending_requests - expect(client).to have_received(:cancel_polling_request) + expect(connection).to have_received(:cancel_polling_request) end end diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 882f3d2b..91e1eccf 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -1,8 +1,9 @@ require 'temporal/activity/task_processor' require 'temporal/middleware/chain' +require 'temporal/configuration' describe Temporal::Activity::TaskProcessor do - subject { described_class.new(task, namespace, lookup, client, middleware_chain) } + subject { described_class.new(task, namespace, lookup, middleware_chain, config) } let(:namespace) { 'test-namespace' } let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } @@ -15,36 +16,41 @@ end let(:metadata) { Temporal::Metadata.generate(Temporal::Metadata::ACTIVITY_TYPE, task) } let(:activity_name) { 'TestActivity' } - let(:client) { instance_double('Temporal::Client::GRPCClient') } + let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:middleware_chain) { Temporal::Middleware::Chain.new } + let(:config) { Temporal::Configuration.new } let(:input) { ['arg1', 'arg2'] } describe '#process' do let(:context) { instance_double('Temporal::Activity::Context', async?: false) } before do + allow(Temporal::Connection) + .to receive(:generate) + .with(config.for_connection) + .and_return(connection) allow(Temporal::Metadata) .to receive(:generate) .with(Temporal::Metadata::ACTIVITY_TYPE, task, namespace) .and_return(metadata) - allow(Temporal::Activity::Context).to receive(:new).with(client, metadata).and_return(context) + allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata).and_return(context) - allow(client).to receive(:respond_activity_task_completed) - allow(client).to receive(:respond_activity_task_failed) + allow(connection).to receive(:respond_activity_task_completed) + allow(connection).to receive(:respond_activity_task_failed) allow(middleware_chain).to receive(:invoke).and_call_original allow(Temporal.metrics).to receive(:timing) # Skip sleeps during retries to speed up the test. - allow(Temporal::Client::Retryer).to receive(:sleep).and_return(nil) + allow(Temporal::Connection::Retryer).to receive(:sleep).and_return(nil) end context 'when activity is not registered' do it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with( task_token: task.task_token, @@ -52,8 +58,8 @@ ) end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_activity_task_failed) .and_raise(StandardError) @@ -101,13 +107,13 @@ it 'completes the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_completed) .with(task_token: task.task_token, result: 'result') end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_activity_task_completed) .and_raise(StandardError) @@ -136,7 +142,7 @@ it 'does not complete the activity task' do subject.process - expect(client).not_to have_received(:respond_activity_task_completed) + expect(connection).not_to have_received(:respond_activity_task_completed) end end end @@ -161,7 +167,7 @@ it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with( task_token: task.task_token, @@ -169,8 +175,8 @@ ) end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_activity_task_failed) .and_raise(StandardError) @@ -214,7 +220,7 @@ it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with( task_token: task.task_token, @@ -229,7 +235,7 @@ it 'does not handle the exception' do expect { subject.process }.to raise_error(exception) - expect(client).not_to have_received(:respond_activity_task_failed) + expect(connection).not_to have_received(:respond_activity_task_failed) end end @@ -239,7 +245,7 @@ it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with(task_token: task.task_token, exception: exception) end diff --git a/spec/unit/lib/temporal/client/converter/composite_spec.rb b/spec/unit/lib/temporal/connection/converter/composite_spec.rb similarity index 64% rename from spec/unit/lib/temporal/client/converter/composite_spec.rb rename to spec/unit/lib/temporal/connection/converter/composite_spec.rb index 7995ed73..9f74393b 100644 --- a/spec/unit/lib/temporal/client/converter/composite_spec.rb +++ b/spec/unit/lib/temporal/connection/converter/composite_spec.rb @@ -1,9 +1,9 @@ -require 'temporal/client/converter/payload/bytes' -require 'temporal/client/converter/payload/json' +require 'temporal/connection/converter/payload/bytes' +require 'temporal/connection/converter/payload/json' -describe Temporal::Client::Converter::Composite do - let(:bytes_converter) { Temporal::Client::Converter::Payload::Bytes.new } - let(:json_converter) { Temporal::Client::Converter::Payload::JSON.new } +describe Temporal::Connection::Converter::Composite do + let(:bytes_converter) { Temporal::Connection::Converter::Payload::Bytes.new } + let(:json_converter) { Temporal::Connection::Converter::Payload::JSON.new } subject { described_class.new(payload_converters: [bytes_converter, json_converter]) } @@ -11,11 +11,11 @@ it 'tries converters until it finds a match' do payloads = [ Temporal::Api::Common::V1::Payload.new( - metadata: { 'encoding' => Temporal::Client::Converter::Payload::Bytes::ENCODING }, + metadata: { 'encoding' => Temporal::Connection::Converter::Payload::Bytes::ENCODING }, data: 'test'.b ), Temporal::Api::Common::V1::Payload.new( - metadata: { 'encoding' => Temporal::Client::Converter::Payload::JSON::ENCODING }, + metadata: { 'encoding' => Temporal::Connection::Converter::Payload::JSON::ENCODING }, data: '"test"' ), ] @@ -33,11 +33,11 @@ it 'uses metadata to pick a converter' do payloads = [ Temporal::Api::Common::V1::Payload.new( - metadata: { 'encoding' => Temporal::Client::Converter::Payload::Bytes::ENCODING }, + metadata: { 'encoding' => Temporal::Connection::Converter::Payload::Bytes::ENCODING }, data: 'test'.b ), Temporal::Api::Common::V1::Payload.new( - metadata: { 'encoding' => Temporal::Client::Converter::Payload::JSON::ENCODING }, + metadata: { 'encoding' => Temporal::Connection::Converter::Payload::JSON::ENCODING }, data: '"test"' ), ] @@ -54,7 +54,7 @@ metadata: { 'encoding' => 'fake' } ) - expect { subject.from_payload(payload) }.to raise_error(Temporal::Client::Converter::Composite::ConverterNotFound) + expect { subject.from_payload(payload) }.to raise_error(Temporal::Connection::Converter::Composite::ConverterNotFound) end end end diff --git a/spec/unit/lib/temporal/client/converter/payload/bytes_spec.rb b/spec/unit/lib/temporal/connection/converter/payload/bytes_spec.rb similarity index 85% rename from spec/unit/lib/temporal/client/converter/payload/bytes_spec.rb rename to spec/unit/lib/temporal/connection/converter/payload/bytes_spec.rb index a2c8db22..8a9391fb 100644 --- a/spec/unit/lib/temporal/client/converter/payload/bytes_spec.rb +++ b/spec/unit/lib/temporal/connection/converter/payload/bytes_spec.rb @@ -1,6 +1,6 @@ -require 'temporal/client/converter/payload/bytes' +require 'temporal/connection/converter/payload/bytes' -describe Temporal::Client::Converter::Payload::Bytes do +describe Temporal::Connection::Converter::Payload::Bytes do subject { described_class.new } describe 'round trip' do diff --git a/spec/unit/lib/temporal/client/converter/payload/json_spec.rb b/spec/unit/lib/temporal/connection/converter/payload/json_spec.rb similarity index 82% rename from spec/unit/lib/temporal/client/converter/payload/json_spec.rb rename to spec/unit/lib/temporal/connection/converter/payload/json_spec.rb index f90acdb1..ebdb6ce4 100644 --- a/spec/unit/lib/temporal/client/converter/payload/json_spec.rb +++ b/spec/unit/lib/temporal/connection/converter/payload/json_spec.rb @@ -1,6 +1,6 @@ -require 'temporal/client/converter/payload/json' +require 'temporal/connection/converter/payload/json' -describe Temporal::Client::Converter::Payload::JSON do +describe Temporal::Connection::Converter::Payload::JSON do subject { described_class.new } describe 'round trip' do diff --git a/spec/unit/lib/temporal/client/converter/payload/nil_spec.rb b/spec/unit/lib/temporal/connection/converter/payload/nil_spec.rb similarity index 80% rename from spec/unit/lib/temporal/client/converter/payload/nil_spec.rb rename to spec/unit/lib/temporal/connection/converter/payload/nil_spec.rb index cac790a4..3779d27b 100644 --- a/spec/unit/lib/temporal/client/converter/payload/nil_spec.rb +++ b/spec/unit/lib/temporal/connection/converter/payload/nil_spec.rb @@ -1,6 +1,6 @@ -require 'temporal/client/converter/payload/nil' +require 'temporal/connection/converter/payload/nil' -describe Temporal::Client::Converter::Payload::Nil do +describe Temporal::Connection::Converter::Payload::Nil do subject { described_class.new } it 'encodes a null payload' do diff --git a/spec/unit/lib/temporal/client/retryer.rb b/spec/unit/lib/temporal/connection/retryer.rb similarity index 94% rename from spec/unit/lib/temporal/client/retryer.rb rename to spec/unit/lib/temporal/connection/retryer.rb index a33a29b8..91f0bb30 100644 --- a/spec/unit/lib/temporal/client/retryer.rb +++ b/spec/unit/lib/temporal/connection/retryer.rb @@ -1,11 +1,11 @@ -require 'temporal/client/retryer' +require 'temporal/connection/retryer' require 'temporal/metadata' require 'time' -describe Temporal::Client::Retryer do +describe Temporal::Connection::Retryer do before do # Skip sleeps during retries to speed up the test. - allow(Temporal::Client::Retryer).to receive(:sleep).and_return(nil) + allow(Temporal::Connection::Retryer).to receive(:sleep).and_return(nil) end it 'backs off and stops retrying eventually' do diff --git a/spec/unit/lib/temporal/client/serializer/continue_as_new_spec.rb b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb similarity index 82% rename from spec/unit/lib/temporal/client/serializer/continue_as_new_spec.rb rename to spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb index b65ca2e1..fbb00623 100644 --- a/spec/unit/lib/temporal/client/serializer/continue_as_new_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb @@ -1,7 +1,7 @@ -require 'temporal/client/serializer/continue_as_new' +require 'temporal/connection/serializer/continue_as_new' require 'temporal/workflow/command' -describe Temporal::Client::Serializer::ContinueAsNew do +describe Temporal::Connection::Serializer::ContinueAsNew do describe 'to_proto' do it 'produces a protobuf' do command = Temporal::Workflow::Command::ContinueAsNew.new( diff --git a/spec/unit/lib/temporal/client/serializer/failure_spec.rb b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb similarity index 71% rename from spec/unit/lib/temporal/client/serializer/failure_spec.rb rename to spec/unit/lib/temporal/connection/serializer/failure_spec.rb index 9b2c4207..cff68c52 100644 --- a/spec/unit/lib/temporal/client/serializer/failure_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb @@ -1,7 +1,7 @@ -require 'temporal/client/serializer/failure' +require 'temporal/connection/serializer/failure' require 'temporal/workflow/command' -describe Temporal::Client::Serializer::Failure do +describe Temporal::Connection::Serializer::Failure do describe 'to_proto' do it 'produces a protobuf' do result = described_class.new(StandardError.new('test')).to_proto diff --git a/spec/unit/lib/temporal/client/serializer/retry_policy_spec.rb b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb similarity index 85% rename from spec/unit/lib/temporal/client/serializer/retry_policy_spec.rb rename to spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb index bce1a792..211f807f 100644 --- a/spec/unit/lib/temporal/client/serializer/retry_policy_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb @@ -1,7 +1,7 @@ require 'temporal/retry_policy' -require 'temporal/client/serializer/retry_policy' +require 'temporal/connection/serializer/retry_policy' -describe Temporal::Client::Serializer::RetryPolicy do +describe Temporal::Connection::Serializer::RetryPolicy do describe 'to_proto' do let(:example_policy) do Temporal::RetryPolicy.new( diff --git a/spec/unit/lib/temporal/execution_options_spec.rb b/spec/unit/lib/temporal/execution_options_spec.rb index 80447447..0c13d6ec 100644 --- a/spec/unit/lib/temporal/execution_options_spec.rb +++ b/spec/unit/lib/temporal/execution_options_spec.rb @@ -1,27 +1,30 @@ require 'temporal/execution_options' +require 'temporal/configuration' describe Temporal::ExecutionOptions do + let(:config) { Temporal::Configuration.new } + class TestExecutionOptionsWorkflow < Temporal::Workflow namespace 'custom-namespace' end describe '#initialize' do it 'accepts a workflow class' do - execution_options = Temporal::ExecutionOptions.new(TestExecutionOptionsWorkflow) + execution_options = Temporal::ExecutionOptions.new(TestExecutionOptionsWorkflow, {}, config.default_execution_options) expect(execution_options.name).to eq('TestExecutionOptionsWorkflow') expect(execution_options.namespace).to eq('custom-namespace') end it 'accepts a workflow name as a string' do - execution_options = Temporal::ExecutionOptions.new('TestExecutionOptionsWorkflow') + execution_options = Temporal::ExecutionOptions.new('TestExecutionOptionsWorkflow', {}, config.default_execution_options) expect(execution_options.name).to eq('TestExecutionOptionsWorkflow') expect(execution_options.namespace).to eq('default-namespace') end it 'accepts a workflow name as a frozen string' do - execution_options = Temporal::ExecutionOptions.new('TestExecutionOptionsWorkflow'.freeze) + execution_options = Temporal::ExecutionOptions.new('TestExecutionOptionsWorkflow'.freeze, {}, config.default_execution_options) expect(execution_options.name).to eq('TestExecutionOptionsWorkflow') expect(execution_options.namespace).to eq('default-namespace') diff --git a/spec/unit/lib/temporal/grpc_client_spec.rb b/spec/unit/lib/temporal/grpc_client_spec.rb index b964aa0d..bcc0458d 100644 --- a/spec/unit/lib/temporal/grpc_client_spec.rb +++ b/spec/unit/lib/temporal/grpc_client_spec.rb @@ -1,5 +1,5 @@ -describe Temporal::Client::GRPCClient do - subject { Temporal::Client::GRPCClient.new(nil, nil, nil) } +describe Temporal::Connection::GRPC do + subject { Temporal::Connection::GRPC.new(nil, nil, nil) } let(:grpc_stub) { double('grpc stub') } let(:namespace) { 'test-namespace' } let(:workflow_id) { SecureRandom.uuid } diff --git a/spec/unit/lib/temporal/testing/temporal_override_spec.rb b/spec/unit/lib/temporal/testing/temporal_override_spec.rb index a6b08cbf..10535055 100644 --- a/spec/unit/lib/temporal/testing/temporal_override_spec.rb +++ b/spec/unit/lib/temporal/testing/temporal_override_spec.rb @@ -12,18 +12,18 @@ def execute; end context 'when testing mode is disabled' do describe 'Temporal.start_workflow' do - let(:client) { instance_double('Temporal::Client::GRPCClient') } + let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:response) { Temporal::Api::WorkflowService::V1::StartWorkflowExecutionResponse.new(run_id: 'xxx') } - before { allow(Temporal::Client).to receive(:generate).and_return(client) } - after { Temporal.remove_instance_variable(:@client) rescue NameError } + before { allow(Temporal::Connection).to receive(:generate).and_return(connection) } + after { Temporal.remove_instance_variable(:@connection) rescue NameError } it 'invokes original implementation' do - allow(client).to receive(:start_workflow_execution).and_return(response) + allow(connection).to receive(:start_workflow_execution).and_return(response) Temporal.start_workflow(TestTemporalOverrideWorkflow) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with(hash_including(workflow_name: 'TestTemporalOverrideWorkflow')) end diff --git a/spec/unit/lib/temporal/worker_spec.rb b/spec/unit/lib/temporal/worker_spec.rb index c70be334..d986958a 100644 --- a/spec/unit/lib/temporal/worker_spec.rb +++ b/spec/unit/lib/temporal/worker_spec.rb @@ -1,8 +1,12 @@ require 'temporal/worker' require 'temporal/workflow' require 'temporal/activity' +require 'temporal/configuration' describe Temporal::Worker do + subject { described_class.new(config) } + let(:config) { Temporal::Configuration.new } + class TestWorkerWorkflow < Temporal::Workflow namespace 'default-namespace' task_queue 'default-task-queue' @@ -138,6 +142,7 @@ class TestWorkerActivity < Temporal::Activity 'default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), + config, [], thread_pool_size: 10 ) @@ -149,6 +154,7 @@ class TestWorkerActivity < Temporal::Activity 'other-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), + config, [], thread_pool_size: 10 ) @@ -160,6 +166,7 @@ class TestWorkerActivity < Temporal::Activity 'default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), + config, [], thread_pool_size: 20 ) @@ -171,6 +178,7 @@ class TestWorkerActivity < Temporal::Activity 'default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), + config, [], thread_pool_size: 20 ) @@ -193,7 +201,14 @@ class TestWorkerActivity < Temporal::Activity activity_poller = instance_double(Temporal::Activity::Poller, start: nil) expect(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 10}) + .with( + 'default-namespace', + 'default-task-queue', + an_instance_of(Temporal::ExecutableLookup), + an_instance_of(Temporal::Configuration), + [], + {thread_pool_size: 10} + ) .and_return(activity_poller) worker = Temporal::Worker.new(activity_thread_pool_size: 10) @@ -235,6 +250,7 @@ class TestWorkerActivity < Temporal::Activity 'default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), + config, [entry_1], thread_pool_size: 10 ) @@ -246,6 +262,7 @@ class TestWorkerActivity < Temporal::Activity 'default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), + config, [entry_2], thread_pool_size: 20 ) diff --git a/spec/unit/lib/temporal/workflow/poller_spec.rb b/spec/unit/lib/temporal/workflow/poller_spec.rb index 4c306180..7f907f69 100644 --- a/spec/unit/lib/temporal/workflow/poller_spec.rb +++ b/spec/unit/lib/temporal/workflow/poller_spec.rb @@ -1,18 +1,20 @@ require 'temporal/workflow/poller' require 'temporal/middleware/entry' +require 'temporal/configuration' describe Temporal::Workflow::Poller do - let(:client) { instance_double('Temporal::Client::GRPCClient') } + let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:namespace) { 'test-namespace' } let(:task_queue) { 'test-task-queue' } let(:lookup) { instance_double('Temporal::ExecutableLookup') } + let(:config) { Temporal::Configuration.new } let(:middleware_chain) { instance_double(Temporal::Middleware::Chain) } let(:middleware) { [] } - subject { described_class.new(namespace, task_queue, lookup, middleware) } + subject { described_class.new(namespace, task_queue, lookup, config, middleware) } before do - allow(Temporal::Client).to receive(:generate).and_return(client) + allow(Temporal::Connection).to receive(:generate).and_return(connection) allow(Temporal::Middleware::Chain).to receive(:new).and_return(middleware_chain) allow(Temporal.metrics).to receive(:timing) end @@ -20,14 +22,14 @@ describe '#start' do it 'polls for decision tasks' do allow(subject).to receive(:shutting_down?).and_return(false, false, true) - allow(client).to receive(:poll_workflow_task_queue).and_return(nil) + allow(connection).to receive(:poll_workflow_task_queue).and_return(nil) subject.start # stop poller before inspecting subject.stop_polling; subject.wait - expect(client) + expect(connection) .to have_received(:poll_workflow_task_queue) .with(namespace: namespace, task_queue: task_queue) .twice @@ -35,7 +37,7 @@ it 'reports time since last poll' do allow(subject).to receive(:shutting_down?).and_return(false, false, true) - allow(client).to receive(:poll_workflow_task_queue).and_return(nil) + allow(connection).to receive(:poll_workflow_task_queue).and_return(nil) subject.start @@ -61,7 +63,7 @@ before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_workflow_task_queue).and_return(task) + allow(connection).to receive(:poll_workflow_task_queue).and_return(task) allow(Temporal::Workflow::TaskProcessor).to receive(:new).and_return(task_processor) end @@ -73,7 +75,7 @@ expect(Temporal::Workflow::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, client, middleware_chain) + .with(task, namespace, lookup, middleware_chain, config) expect(task_processor).to have_received(:process) end @@ -96,15 +98,15 @@ def call(_); end expect(Temporal::Middleware::Chain).to have_received(:new).with(middleware) expect(Temporal::Workflow::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, client, middleware_chain) + .with(task, namespace, lookup, middleware_chain, config) end end end - context 'when client is unable to poll' do + context 'when connection is unable to poll' do before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_workflow_task_queue).and_raise(StandardError) + allow(connection).to receive(:poll_workflow_task_queue).and_raise(StandardError) end it 'logs' do diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index ef633a74..e8f4dc69 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -1,8 +1,9 @@ require 'temporal/workflow/task_processor' require 'temporal/middleware/chain' +require 'temporal/configuration' describe Temporal::Workflow::TaskProcessor do - subject { described_class.new(task, namespace, lookup, client, middleware_chain) } + subject { described_class.new(task, namespace, lookup, middleware_chain, config) } let(:namespace) { 'test-namespace' } let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } @@ -13,16 +14,21 @@ ) end let(:workflow_name) { 'TestWorkflow' } - let(:client) { instance_double('Temporal::Client::GRPCClient') } + let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:middleware_chain) { Temporal::Middleware::Chain.new } let(:input) { ['arg1', 'arg2'] } + let(:config) { Temporal::Configuration.new } describe '#process' do let(:context) { instance_double('Temporal::Workflow::Context') } before do - allow(client).to receive(:respond_workflow_task_completed) - allow(client).to receive(:respond_workflow_task_failed) + allow(Temporal::Connection) + .to receive(:generate) + .with(config.for_connection) + .and_return(connection) + allow(connection).to receive(:respond_workflow_task_completed) + allow(connection).to receive(:respond_workflow_task_failed) allow(middleware_chain).to receive(:invoke).and_call_original @@ -30,8 +36,8 @@ end context 'when workflow is not registered' do - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_workflow_task_failed) .and_raise(StandardError) @@ -86,13 +92,13 @@ it 'completes the workflow task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_workflow_task_completed) .with(task_token: task.task_token, commands: commands) end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_workflow_task_completed) .and_raise(StandardError) @@ -124,7 +130,7 @@ it 'fails the workflow task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_workflow_task_failed) .with( task_token: task.task_token, @@ -137,12 +143,12 @@ task.attempt = 2 subject.process - expect(client) + expect(connection) .not_to have_received(:respond_workflow_task_failed) end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_workflow_task_failed) .and_raise(StandardError) diff --git a/spec/unit/lib/temporal_spec.rb b/spec/unit/lib/temporal_spec.rb index 8e245a76..b773626a 100644 --- a/spec/unit/lib/temporal_spec.rb +++ b/spec/unit/lib/temporal_spec.rb @@ -1,24 +1,25 @@ require 'temporal' require 'temporal/workflow' +require 'temporal/connection/grpc' describe Temporal do describe 'client operations' do - let(:client) { instance_double(Temporal::Client::GRPCClient) } + let(:connection) { instance_double(Temporal::Connection::GRPC) } class TestStartWorkflow < Temporal::Workflow namespace 'default-test-namespace' task_queue 'default-test-task-queue' end - before { allow(Temporal::Client).to receive(:generate).and_return(client) } - after { described_class.remove_instance_variable(:@client) rescue NameError } + before { allow(Temporal::Connection).to receive(:generate).and_return(connection) } + after { described_class.remove_instance_variable(:@connection) rescue NameError } describe '.start_workflow' do let(:temporal_response) do Temporal::Api::WorkflowService::V1::StartWorkflowExecutionResponse.new(run_id: 'xxx') end - before { allow(client).to receive(:start_workflow_execution).and_return(temporal_response) } + before { allow(connection).to receive(:start_workflow_execution).and_return(temporal_response) } context 'using a workflow class' do it 'returns run_id' do @@ -30,7 +31,7 @@ class TestStartWorkflow < Temporal::Workflow it 'starts a workflow using the default options' do described_class.start_workflow(TestStartWorkflow, 42) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( namespace: 'default-test-namespace', @@ -58,7 +59,7 @@ class TestStartWorkflow < Temporal::Workflow } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( namespace: 'test-namespace', @@ -83,7 +84,7 @@ class TestStartWorkflow < Temporal::Workflow options: { name: 'test-workflow' } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( namespace: 'default-test-namespace', @@ -102,7 +103,7 @@ class TestStartWorkflow < Temporal::Workflow it 'starts a workflow using specified workflow_id' do described_class.start_workflow(TestStartWorkflow, 42, options: { workflow_id: '123' }) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( namespace: 'default-test-namespace', @@ -123,7 +124,7 @@ class TestStartWorkflow < Temporal::Workflow TestStartWorkflow, 42, options: { workflow_id_reuse_policy: :allow } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( namespace: 'default-test-namespace', @@ -148,7 +149,7 @@ class TestStartWorkflow < Temporal::Workflow options: { namespace: 'test-namespace', task_queue: 'test-task-queue' } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( namespace: 'test-namespace', @@ -171,12 +172,12 @@ class TestStartWorkflow < Temporal::Workflow Temporal::Api::WorkflowService::V1::TerminateWorkflowExecutionResponse.new end - before { allow(client).to receive(:terminate_workflow_execution).and_return(temporal_response) } + before { allow(connection).to receive(:terminate_workflow_execution).and_return(temporal_response) } it 'terminates a workflow' do described_class.terminate_workflow('my-workflow', reason: 'just stop it') - expect(client) + expect(connection) .to have_received(:terminate_workflow_execution) .with( namespace: 'default-namespace', @@ -193,12 +194,12 @@ class TestStartWorkflow < Temporal::Workflow Temporal::Api::WorkflowService::V1::StartWorkflowExecutionResponse.new(run_id: 'xxx') end - before { allow(client).to receive(:start_workflow_execution).and_return(temporal_response) } + before { allow(connection).to receive(:start_workflow_execution).and_return(temporal_response) } it 'starts a cron workflow' do described_class.schedule_workflow(TestStartWorkflow, '* * * * *', 42) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( namespace: 'default-test-namespace', @@ -217,12 +218,12 @@ class TestStartWorkflow < Temporal::Workflow end describe '.register_namespace' do - before { allow(client).to receive(:register_namespace).and_return(nil) } + before { allow(connection).to receive(:register_namespace).and_return(nil) } it 'registers namespace with the specified name' do described_class.register_namespace('new-namespace') - expect(client) + expect(connection) .to have_received(:register_namespace) .with(name: 'new-namespace', description: nil) end @@ -230,7 +231,7 @@ class TestStartWorkflow < Temporal::Workflow it 'registers namespace with the specified name and description' do described_class.register_namespace('new-namespace', 'namespace description') - expect(client) + expect(connection) .to have_received(:register_namespace) .with(name: 'new-namespace', description: 'namespace description') end @@ -244,12 +245,12 @@ class TestStartWorkflow < Temporal::Workflow end let(:api_info) { Fabricate(:api_workflow_execution_info) } - before { allow(client).to receive(:describe_workflow_execution).and_return(response) } + before { allow(connection).to receive(:describe_workflow_execution).and_return(response) } it 'requests execution info from Temporal' do described_class.fetch_workflow_execution_info('namespace', '111', '222') - expect(client) + expect(connection) .to have_received(:describe_workflow_execution) .with(namespace: 'namespace', workflow_id: '111', run_id: '222') end @@ -266,12 +267,12 @@ class TestStartWorkflow < Temporal::Workflow Temporal::Api::WorkflowService::V1::ResetWorkflowExecutionResponse.new(run_id: 'xxx') end - before { allow(client).to receive(:reset_workflow_execution).and_return(temporal_response) } + before { allow(connection).to receive(:reset_workflow_execution).and_return(temporal_response) } context 'when workflow_task_id is provided' do let(:workflow_task_id) { 42 } - it 'calls client reset_workflow_execution' do + it 'calls connection reset_workflow_execution' do described_class.reset_workflow( 'default-test-namespace', '123', @@ -280,7 +281,7 @@ class TestStartWorkflow < Temporal::Workflow reason: 'Test reset' ) - expect(client).to have_received(:reset_workflow_execution).with( + expect(connection).to have_received(:reset_workflow_execution).with( namespace: 'default-test-namespace', workflow_id: '123', run_id: '1234', @@ -312,12 +313,12 @@ class TestStartWorkflow < Temporal::Workflow end describe '.complete_activity' do - before { allow(client).to receive(:respond_activity_task_completed_by_id).and_return(nil) } + before { allow(connection).to receive(:respond_activity_task_completed_by_id).and_return(nil) } it 'completes activity with a result' do described_class.complete_activity(async_token, 'all work completed') - expect(client) + expect(connection) .to have_received(:respond_activity_task_completed_by_id) .with( namespace: namespace, @@ -331,7 +332,7 @@ class TestStartWorkflow < Temporal::Workflow it 'completes activity without a result' do described_class.complete_activity(async_token) - expect(client) + expect(connection) .to have_received(:respond_activity_task_completed_by_id) .with( namespace: namespace, @@ -344,13 +345,13 @@ class TestStartWorkflow < Temporal::Workflow end describe '.fail_activity' do - before { allow(client).to receive(:respond_activity_task_failed_by_id).and_return(nil) } + before { allow(connection).to receive(:respond_activity_task_failed_by_id).and_return(nil) } it 'fails activity with a provided error' do exception = StandardError.new('something went wrong') described_class.fail_activity(async_token, exception) - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed_by_id) .with( namespace: namespace, @@ -376,7 +377,7 @@ class NamespacedWorkflow < Temporal::Workflow completed_event = Fabricate(:workflow_completed_event, result: nil) response = Fabricate(:workflow_execution_history, events: [completed_event]) - expect(client) + expect(connection) .to receive(:get_workflow_execution_history) .with( namespace: 'some-namespace', @@ -399,7 +400,7 @@ class NamespacedWorkflow < Temporal::Workflow completed_event = Fabricate(:workflow_completed_event, result: nil) response = Fabricate(:workflow_execution_history, events: [completed_event]) - expect(client) + expect(connection) .to receive(:get_workflow_execution_history) .with( namespace: 'some-other-namespace', @@ -433,7 +434,7 @@ class NamespacedWorkflow < Temporal::Workflow ) completed_event = Fabricate(:workflow_completed_event, result: payload) response = Fabricate(:workflow_execution_history, events: [completed_event]) - expect(client) + expect(connection) .to receive(:get_workflow_execution_history) .with( namespace: 'default-test-namespace', @@ -459,7 +460,7 @@ class NamespacedWorkflow < Temporal::Workflow completed_event = Fabricate(:workflow_canceled_event) response = Fabricate(:workflow_execution_history, events: [completed_event]) - expect(client) + expect(connection) .to receive(:get_workflow_execution_history) .with( namespace: 'default-test-namespace', @@ -482,7 +483,7 @@ class NamespacedWorkflow < Temporal::Workflow it 'raises TimeoutError when the server times out' do response = Fabricate(:workflow_execution_history, events: []) - expect(client) + expect(connection) .to receive(:get_workflow_execution_history) .with( namespace: 'default-test-namespace',