diff --git a/lib/cadence.rb b/lib/cadence.rb index 002d97b1..059ddc14 100644 --- a/lib/cadence.rb +++ b/lib/cadence.rb @@ -1,7 +1,7 @@ require 'securerandom' require 'cadence/configuration' require 'cadence/execution_options' -require 'cadence/client' +require 'cadence/connection' require 'cadence/activity' require 'cadence/activity/async_token' require 'cadence/workflow' @@ -15,10 +15,10 @@ def start_workflow(workflow, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(workflow, options) + execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) workflow_id = options[:workflow_id] || SecureRandom.uuid - response = client.start_workflow_execution( + response = connection.start_workflow_execution( domain: execution_options.domain, workflow_id: workflow_id, workflow_name: execution_options.name, @@ -37,10 +37,10 @@ def schedule_workflow(workflow, cron_schedule, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(workflow, options) + execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) workflow_id = options[:workflow_id] || SecureRandom.uuid - response = client.start_workflow_execution( + response = connection.start_workflow_execution( domain: execution_options.domain, workflow_id: workflow_id, workflow_name: execution_options.name, @@ -57,13 +57,13 @@ def schedule_workflow(workflow, cron_schedule, *input, **args) end def register_domain(name, description = nil) - client.register_domain(name: name, description: description) + connection.register_domain(name: name, description: description) rescue CadenceThrift::DomainAlreadyExistsError nil end def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) - client.signal_workflow_execution( + connection.signal_workflow_execution( domain: workflow.domain, # TODO: allow passing domain instead workflow_id: workflow_id, run_id: run_id, @@ -76,7 +76,7 @@ def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: ' decision_task_id ||= get_last_completed_decision_task(domain, workflow_id, run_id) raise Error, 'Could not find a completed decision task event' unless decision_task_id - response = client.reset_workflow_execution( + response = connection.reset_workflow_execution( domain: domain, workflow_id: workflow_id, run_id: run_id, @@ -88,7 +88,7 @@ def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: ' end def terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination', details: nil) - client.terminate_workflow_execution( + connection.terminate_workflow_execution( domain: domain, workflow_id: workflow_id, run_id: run_id, @@ -98,7 +98,7 @@ def terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination' end def fetch_workflow_execution_info(domain, workflow_id, run_id) - response = client.describe_workflow_execution( + response = connection.describe_workflow_execution( domain: domain, workflow_id: workflow_id, run_id: run_id @@ -110,7 +110,7 @@ def fetch_workflow_execution_info(domain, workflow_id, run_id) def complete_activity(async_token, result = nil) details = Activity::AsyncToken.decode(async_token) - client.respond_activity_task_completed_by_id( + connection.respond_activity_task_completed_by_id( domain: details.domain, activity_id: details.activity_id, workflow_id: details.workflow_id, @@ -122,7 +122,7 @@ def complete_activity(async_token, result = nil) def fail_activity(async_token, error) details = Activity::AsyncToken.decode(async_token) - client.respond_activity_task_failed_by_id( + connection.respond_activity_task_failed_by_id( domain: details.domain, activity_id: details.activity_id, workflow_id: details.workflow_id, @@ -133,23 +133,24 @@ def fail_activity(async_token, error) end def configure(&block) - yield configuration + yield config end def configuration - @configuration ||= Configuration.new + warn '[DEPRECATION] This method is now deprecated without a substitution' + config end def logger - configuration.logger + config.logger end def metrics - @metrics ||= Metrics.new(configuration.metrics_adapter) + @metrics ||= Metrics.new(config.metrics_adapter) end def get_workflow_history(domain:, workflow_id:, run_id:) - history_response = client.get_workflow_execution_history( + history_response = connection.get_workflow_execution_history( domain: domain, workflow_id: workflow_id, run_id: run_id @@ -159,8 +160,12 @@ def get_workflow_history(domain:, workflow_id:, run_id:) private - def client - @client ||= Cadence::Client.generate + def config + @config ||= Configuration.new + end + + def connection + @connection ||= Cadence::Connection.generate(config.for_connection) end def get_last_completed_decision_task(domain, workflow_id, run_id) diff --git a/lib/cadence/activity/context.rb b/lib/cadence/activity/context.rb index fcea1d92..e5f7d705 100644 --- a/lib/cadence/activity/context.rb +++ b/lib/cadence/activity/context.rb @@ -7,8 +7,8 @@ module Cadence class Activity class Context - def initialize(client, metadata) - @client = client + def initialize(connection, metadata) + @connection = connection @metadata = metadata @async = false end @@ -32,7 +32,7 @@ def async_token def heartbeat(details = nil) logger.debug('Activity heartbeat') - client.record_activity_task_heartbeat(task_token: task_token, details: details) + connection.record_activity_task_heartbeat(task_token: task_token, details: details) end def logger @@ -54,7 +54,7 @@ def headers private - attr_reader :client, :metadata + attr_reader :connection, :metadata def task_token metadata.task_token diff --git a/lib/cadence/activity/poller.rb b/lib/cadence/activity/poller.rb index 366a620a..a51966d3 100644 --- a/lib/cadence/activity/poller.rb +++ b/lib/cadence/activity/poller.rb @@ -1,4 +1,4 @@ -require 'cadence/client' +require 'cadence/connection' require 'cadence/thread_pool' require 'cadence/middleware/chain' require 'cadence/activity/task_processor' @@ -10,10 +10,11 @@ class Poller thread_pool_size: 20 }.freeze - def initialize(domain, task_list, activity_lookup, middleware = [], options = {}) + def initialize(domain, task_list, activity_lookup, config, middleware = [], options = {}) @domain = domain @task_list = task_list @activity_lookup = activity_lookup + @config = config @middleware = middleware @options = DEFAULT_OPTIONS.merge(options) @shutting_down = false @@ -36,10 +37,10 @@ def wait private - attr_reader :domain, :task_list, :activity_lookup, :middleware, :options, :thread + attr_reader :domain, :task_list, :activity_lookup, :config, :middleware, :options, :thread - def client - @client ||= Cadence::Client.generate(options) + def connection + @connection ||= Cadence::Connection.generate(config.for_connection, options) end def shutting_down? @@ -68,17 +69,16 @@ def poll_loop end def poll_for_task - client.poll_for_activity_task(domain: domain, task_list: task_list) + connection.poll_for_activity_task(domain: domain, task_list: task_list) rescue StandardError => error Cadence.logger.error("Unable to poll for an activity task: #{error.inspect}") nil end def process(task) - client = Cadence::Client.generate middleware_chain = Middleware::Chain.new(middleware) - TaskProcessor.new(task, domain, activity_lookup, client, middleware_chain).process + TaskProcessor.new(task, domain, activity_lookup, middleware_chain, config).process end def thread_pool diff --git a/lib/cadence/activity/task_processor.rb b/lib/cadence/activity/task_processor.rb index 14f10733..95dc106c 100644 --- a/lib/cadence/activity/task_processor.rb +++ b/lib/cadence/activity/task_processor.rb @@ -1,18 +1,19 @@ require 'cadence/metadata' require 'cadence/activity/context' require 'cadence/json' +require 'cadence/connection' module Cadence class Activity class TaskProcessor - def initialize(task, domain, activity_lookup, client, middleware_chain) + def initialize(task, domain, activity_lookup, middleware_chain, config) @task = task @domain = domain @task_token = task.taskToken @activity_name = task.activityType.name @activity_class = activity_lookup.find(activity_name) - @client = client @middleware_chain = middleware_chain + @config = config end def process @@ -27,7 +28,7 @@ def process end metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, domain) - context = Activity::Context.new(client, metadata) + context = Activity::Context.new(connection, metadata) result = middleware_chain.invoke(metadata) do activity_class.execute_in_context(context, JSON.deserialize(task.input)) @@ -49,7 +50,12 @@ def process private - attr_reader :task, :domain, :task_token, :activity_name, :activity_class, :client, :middleware_chain + attr_reader :task, :domain, :task_token, :activity_name, :activity_class, + :middleware_chain, :config + + def connection + @connection ||= Cadence::Connection.generate(config.for_connection) + end def queue_time_ms ((task.startedTimestamp - task.scheduledTimestampOfThisAttempt) / 1_000_000).round @@ -57,14 +63,14 @@ def queue_time_ms def respond_completed(result) Cadence.logger.info("Activity #{activity_name} completed") - client.respond_activity_task_completed(task_token: task_token, result: result) + connection.respond_activity_task_completed(task_token: task_token, result: result) rescue StandardError => error Cadence.logger.error("Unable to complete Activity #{activity_name}: #{error.inspect}") end def respond_failed(reason, details) Cadence.logger.error("Activity #{activity_name} failed with: #{reason}") - client.respond_activity_task_failed(task_token: task_token, reason: reason, details: details) + connection.respond_activity_task_failed(task_token: task_token, reason: reason, details: details) rescue StandardError => error Cadence.logger.error("Unable to fail Activity #{activity_name}: #{error.inspect}") end diff --git a/lib/cadence/client.rb b/lib/cadence/client.rb deleted file mode 100644 index e5ab4883..00000000 --- a/lib/cadence/client.rb +++ /dev/null @@ -1,21 +0,0 @@ -require 'cadence/client/thrift_client' - -module Cadence - module Client - CLIENT_TYPES_MAP = { - thrift: Cadence::Client::ThriftClient - }.freeze - - def self.generate(options = {}) - client_class = CLIENT_TYPES_MAP[Cadence.configuration.client_type] - host = Cadence.configuration.host - port = Cadence.configuration.port - - hostname = `hostname` - thread_id = Thread.current.object_id - identity = "#{thread_id}@#{hostname}" - - client_class.new(host, port, identity, options) - end - end -end diff --git a/lib/cadence/configuration.rb b/lib/cadence/configuration.rb index 994970ab..55cd31bf 100644 --- a/lib/cadence/configuration.rb +++ b/lib/cadence/configuration.rb @@ -3,8 +3,11 @@ module Cadence class Configuration + Connection = Struct.new(:type, :host, :port, keyword_init: true) + Execution = Struct.new(:domain, :task_list, :timeouts, :headers, keyword_init: true) + attr_reader :timeouts - attr_accessor :client_type, :host, :port, :logger, :metrics_adapter, :domain, :task_list, :headers + attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :domain, :task_list, :headers DEFAULT_TIMEOUTS = { execution: 60, # End-to-end workflow time @@ -20,7 +23,7 @@ class Configuration DEFAULT_TASK_LIST = 'default-task-list'.freeze def initialize - @client_type = :thrift + @connection_type = :thrift @logger = Logger.new(STDOUT, progname: 'cadence_client') @metrics_adapter = MetricsAdapters::Null.new @timeouts = DEFAULT_TIMEOUTS @@ -32,5 +35,22 @@ def initialize def timeouts=(new_timeouts) @timeouts = DEFAULT_TIMEOUTS.merge(new_timeouts) end + + def for_connection + Connection.new( + type: connection_type, + host: host, + port: port + ).freeze + end + + def default_execution_options + Execution.new( + domain: domain, + task_list: task_list, + timeouts: timeouts, + headers: headers + ).freeze + end end end diff --git a/lib/cadence/connection.rb b/lib/cadence/connection.rb new file mode 100644 index 00000000..ce32f503 --- /dev/null +++ b/lib/cadence/connection.rb @@ -0,0 +1,21 @@ +require 'cadence/connection/thrift' + +module Cadence + module Connection + CLIENT_TYPES_MAP = { + thrift: Cadence::Connection::Thrift + }.freeze + + def self.generate(configuration, options = {}) + connection_class = CLIENT_TYPES_MAP[configuration.type] + host = configuration.host + port = configuration.port + + hostname = `hostname` + thread_id = Thread.current.object_id + identity = "#{thread_id}@#{hostname}" + + connection_class.new(host, port, identity, options) + end + end +end diff --git a/lib/cadence/client/errors.rb b/lib/cadence/connection/errors.rb similarity index 58% rename from lib/cadence/client/errors.rb rename to lib/cadence/connection/errors.rb index 674c60b9..4f6657c6 100644 --- a/lib/cadence/client/errors.rb +++ b/lib/cadence/connection/errors.rb @@ -1,8 +1,8 @@ module Cadence - module Client + module Connection class Error < StandardError; end - # incorrect arguments passed to the client + # incorrect arguments passed to the connection class ArgumentError < Error; end end end diff --git a/lib/cadence/client/thrift_client.rb b/lib/cadence/connection/thrift.rb similarity index 98% rename from lib/cadence/client/thrift_client.rb rename to lib/cadence/connection/thrift.rb index 38a0386d..e977212d 100644 --- a/lib/cadence/client/thrift_client.rb +++ b/lib/cadence/connection/thrift.rb @@ -1,12 +1,12 @@ require 'thrift' require 'securerandom' require 'cadence/json' -require 'cadence/client/errors' +require 'cadence/connection/errors' require 'gen/thrift/workflow_service' module Cadence - module Client - class ThriftClient + module Connection + class Thrift WORKFLOW_ID_REUSE_POLICY = { allow_failed: CadenceThrift::WorkflowIdReusePolicy::AllowDuplicateFailedOnly, allow: CadenceThrift::WorkflowIdReusePolicy::AllowDuplicate, @@ -344,7 +344,7 @@ def describe_task_list(domain:, task_list:) attr_reader :url, :identity, :options, :mutex def transport - @transport ||= Thrift::HTTPClientTransport.new(url).tap do |http| + @transport ||= ::Thrift::HTTPClientTransport.new(url).tap do |http| http.add_headers( 'Rpc-Caller' => 'ruby-client', 'Rpc-Encoding' => 'thrift', @@ -356,7 +356,7 @@ def transport def connection @connection ||= begin - protocol = Thrift::BinaryProtocol.new(transport) + protocol = ::Thrift::BinaryProtocol.new(transport) CadenceThrift::WorkflowService::Client.new(protocol) end end diff --git a/lib/cadence/execution_options.rb b/lib/cadence/execution_options.rb index c4572df4..25d816ea 100644 --- a/lib/cadence/execution_options.rb +++ b/lib/cadence/execution_options.rb @@ -4,7 +4,7 @@ module Cadence class ExecutionOptions attr_reader :name, :domain, :task_list, :retry_policy, :timeouts, :headers - def initialize(object, options = {}) + def initialize(object, options, defaults = nil) @name = options[:name] || object.to_s @domain = options[:domain] @task_list = options[:task_list] @@ -21,10 +21,12 @@ def initialize(object, options = {}) @headers = object.headers.merge(@headers) if object.headers end - @domain ||= Cadence.configuration.domain - @task_list ||= Cadence.configuration.task_list - @timeouts = Cadence.configuration.timeouts.merge(@timeouts) - @headers = Cadence.configuration.headers.merge(@headers) + if defaults + @domain ||= defaults.domain + @task_list ||= defaults.task_list + @timeouts = defaults.timeouts.merge(@timeouts) + @headers = defaults.headers.merge(@headers) + end freeze end diff --git a/lib/cadence/worker.rb b/lib/cadence/worker.rb index 9f7c1822..a3632950 100644 --- a/lib/cadence/worker.rb +++ b/lib/cadence/worker.rb @@ -1,4 +1,3 @@ -require 'cadence/client' require 'cadence/workflow/poller' require 'cadence/activity/poller' require 'cadence/execution_options' @@ -7,7 +6,8 @@ module Cadence class Worker - def initialize(options = {}) + def initialize(config = Cadence.configuration, **options) + @config = config @options = options @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @@ -18,14 +18,14 @@ def initialize(options = {}) end def register_workflow(workflow_class, options = {}) - execution_options = ExecutionOptions.new(workflow_class, options) + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) key = [execution_options.domain, execution_options.task_list] @workflows[key].add(execution_options.name, workflow_class) end def register_activity(activity_class, options = {}) - execution_options = ExecutionOptions.new(activity_class, options) + execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options) key = [execution_options.domain, execution_options.task_list] @activities[key].add(execution_options.name, activity_class) @@ -67,7 +67,7 @@ def stop private - attr_reader :options, :activities, :workflows, :pollers, + attr_reader :config, :options, :activities, :workflows, :pollers, :decision_middleware, :activity_middleware def shutting_down? @@ -75,11 +75,11 @@ def shutting_down? end def workflow_poller_for(domain, task_list, lookup) - Workflow::Poller.new(domain, task_list, lookup.freeze, decision_middleware, options) + Workflow::Poller.new(domain, task_list, lookup.freeze, config, decision_middleware, options) end def activity_poller_for(domain, task_list, lookup) - Activity::Poller.new(domain, task_list, lookup.freeze, activity_middleware, options) + Activity::Poller.new(domain, task_list, lookup.freeze, config, activity_middleware, options) end def trap_signals diff --git a/lib/cadence/workflow/context.rb b/lib/cadence/workflow/context.rb index 38564b52..9f70fe61 100644 --- a/lib/cadence/workflow/context.rb +++ b/lib/cadence/workflow/context.rb @@ -15,10 +15,11 @@ module Cadence class Workflow class Context - def initialize(state_manager, dispatcher, metadata) + def initialize(state_manager, dispatcher, metadata, config) @state_manager = state_manager @dispatcher = dispatcher @metadata = metadata + @config = config end def logger @@ -39,7 +40,7 @@ def execute_activity(activity_class, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(activity_class, options) + execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options) decision = Decision::ScheduleActivity.new( activity_id: options[:activity_id], @@ -97,7 +98,7 @@ def execute_workflow(workflow_class, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? - execution_options = ExecutionOptions.new(workflow_class, options) + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) decision = Decision::StartChildWorkflow.new( workflow_id: options[:workflow_id] || SecureRandom.uuid, @@ -243,7 +244,7 @@ def cancel(target, cancelation_id) private - attr_reader :state_manager, :dispatcher, :metadata + attr_reader :state_manager, :dispatcher, :metadata, :config def schedule_decision(decision) state_manager.schedule(decision) diff --git a/lib/cadence/workflow/decision_task_processor.rb b/lib/cadence/workflow/decision_task_processor.rb index c261bb39..25f699c6 100644 --- a/lib/cadence/workflow/decision_task_processor.rb +++ b/lib/cadence/workflow/decision_task_processor.rb @@ -8,14 +8,14 @@ class Workflow class DecisionTaskProcessor MAX_FAILED_ATTEMPTS = 50 - def initialize(task, domain, workflow_lookup, client, middleware_chain) + def initialize(task, domain, workflow_lookup, middleware_chain, config) @task = task @domain = domain @task_token = task.taskToken @workflow_name = task.workflowType.name @workflow_class = workflow_lookup.find(workflow_name) - @client = client @middleware_chain = middleware_chain + @config = config end def process @@ -31,7 +31,7 @@ def process history = fetch_full_history # TODO: For sticky workflows we need to cache the Executor instance - executor = Workflow::Executor.new(workflow_class, history) + executor = Workflow::Executor.new(workflow_class, history, config) metadata = Metadata.generate(Metadata::DECISION_TYPE, task, domain) decisions = middleware_chain.invoke(metadata) do @@ -50,7 +50,12 @@ def process private - attr_reader :task, :domain, :task_token, :workflow_name, :workflow_class, :client, :middleware_chain + attr_reader :task, :domain, :task_token, :workflow_name, :workflow_class, + :middleware_chain, :config + + def connection + @connection ||= Cadence::Connection.generate(config.for_connection) + end def queue_time_ms ((task.startedTimestamp - task.scheduledTimestamp) / 1_000_000).round @@ -65,7 +70,7 @@ def fetch_full_history next_page_token = task.nextPageToken while next_page_token do - response = client.get_workflow_execution_history( + response = connection.get_workflow_execution_history( domain: domain, workflow_id: task.workflowExecution.workflowId, run_id: task.workflowExecution.runId, @@ -82,7 +87,7 @@ def fetch_full_history def complete_task(decisions) Cadence.logger.info("Decision task for #{workflow_name} completed") - client.respond_decision_task_completed( + connection.respond_decision_task_completed( task_token: task_token, decisions: serialize_decisions(decisions) ) @@ -94,7 +99,7 @@ def fail_task(message) # Stop from getting into infinite loop if the error persists return if task.attempt >= MAX_FAILED_ATTEMPTS - client.respond_decision_task_failed( + connection.respond_decision_task_failed( task_token: task_token, cause: CadenceThrift::DecisionTaskFailedCause::UNHANDLED_DECISION, details: message diff --git a/lib/cadence/workflow/executor.rb b/lib/cadence/workflow/executor.rb index 453781ee..91e30692 100644 --- a/lib/cadence/workflow/executor.rb +++ b/lib/cadence/workflow/executor.rb @@ -8,11 +8,12 @@ module Cadence class Workflow class Executor - def initialize(workflow_class, history) + def initialize(workflow_class, history, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new @state_manager = StateManager.new(dispatcher) @history = history + @config = config end def run @@ -31,10 +32,10 @@ def run private - attr_reader :workflow_class, :dispatcher, :state_manager, :history + attr_reader :workflow_class, :dispatcher, :state_manager, :history, :config def execute_workflow(input, metadata) - context = Workflow::Context.new(state_manager, dispatcher, metadata) + context = Workflow::Context.new(state_manager, dispatcher, metadata, config) Fiber.new do workflow_class.execute_in_context(context, input) diff --git a/lib/cadence/workflow/poller.rb b/lib/cadence/workflow/poller.rb index f888b153..0fcb8317 100644 --- a/lib/cadence/workflow/poller.rb +++ b/lib/cadence/workflow/poller.rb @@ -1,4 +1,4 @@ -require 'cadence/client' +require 'cadence/connection' require 'cadence/thread_pool' require 'cadence/middleware/chain' require 'cadence/workflow/decision_task_processor' @@ -10,10 +10,11 @@ class Poller thread_pool_size: 1 }.freeze - def initialize(domain, task_list, workflow_lookup, middleware = [], options = {}) + def initialize(domain, task_list, workflow_lookup, config, middleware = [], options = {}) @domain = domain @task_list = task_list @workflow_lookup = workflow_lookup + @config = config @middleware = middleware @options = DEFAULT_OPTIONS.merge(options) @shutting_down = false @@ -36,10 +37,10 @@ def wait private - attr_reader :domain, :task_list, :client, :workflow_lookup, :middleware, :options + attr_reader :domain, :task_list, :connection, :workflow_lookup, :config, :middleware, :options - def client - @client ||= Cadence::Client.generate(options) + def connection + @connection ||= Cadence::Connection.generate(config.for_connection, options) end def shutting_down? @@ -68,17 +69,16 @@ def poll_loop end def poll_for_task - client.poll_for_decision_task(domain: domain, task_list: task_list) + connection.poll_for_decision_task(domain: domain, task_list: task_list) rescue StandardError => error Cadence.logger.error("Unable to poll for a decision task: #{error.inspect}") nil end def process(task) - client = Cadence::Client.generate middleware_chain = Middleware::Chain.new(middleware) - DecisionTaskProcessor.new(task, domain, workflow_lookup, client, middleware_chain).process + DecisionTaskProcessor.new(task, domain, workflow_lookup, middleware_chain, config).process end def thread_pool diff --git a/spec/unit/lib/cadence/activity/context_spec.rb b/spec/unit/lib/cadence/activity/context_spec.rb index d26d3d6e..f69985a3 100644 --- a/spec/unit/lib/cadence/activity/context_spec.rb +++ b/spec/unit/lib/cadence/activity/context_spec.rb @@ -1,21 +1,22 @@ require 'cadence/activity/context' require 'cadence/metadata/activity' +require 'cadence/connection/thrift' describe Cadence::Activity::Context do - let(:client) { instance_double('Cadence::Client::ThriftClient') } + let(:connection) { instance_double('Cadence::Connection::Thrift') } let(:metadata_hash) { Fabricate(:activity_metadata).to_h } let(:metadata) { Cadence::Metadata::Activity.new(metadata_hash) } let(:task_token) { SecureRandom.uuid } - subject { described_class.new(client, metadata) } + subject { described_class.new(connection, metadata) } describe '#heartbeat' do - before { allow(client).to receive(:record_activity_task_heartbeat) } + before { allow(connection).to receive(:record_activity_task_heartbeat) } it 'records heartbeat' do subject.heartbeat - expect(client) + expect(connection) .to have_received(:record_activity_task_heartbeat) .with(task_token: metadata.task_token, details: nil) end @@ -23,7 +24,7 @@ it 'records heartbeat with details' do subject.heartbeat(foo: :bar) - expect(client) + expect(connection) .to have_received(:record_activity_task_heartbeat) .with(task_token: metadata.task_token, details: { foo: :bar }) end @@ -37,7 +38,7 @@ describe '#async?' do subject { context.async? } - let(:context) { described_class.new(client, metadata) } + let(:context) { described_class.new(connection, metadata) } context 'when context is sync' do it { is_expected.to eq(false) } diff --git a/spec/unit/lib/cadence/activity/poller_spec.rb b/spec/unit/lib/cadence/activity/poller_spec.rb index 0c45d35d..d33cf69e 100644 --- a/spec/unit/lib/cadence/activity/poller_spec.rb +++ b/spec/unit/lib/cadence/activity/poller_spec.rb @@ -1,24 +1,26 @@ require 'cadence/activity/poller' require 'cadence/middleware/entry' +require 'cadence/configuration' describe Cadence::Activity::Poller do - let(:client) { instance_double('Cadence::Client::ThriftClient') } + let(:connection) { instance_double('Cadence::Connection::Thrift') } let(:domain) { 'test-domain' } let(:task_list) { 'test-task-list' } let(:lookup) { instance_double('Cadence::ExecutableLookup') } let(:thread_pool) do instance_double(Cadence::ThreadPool, wait_for_available_threads: nil, shutdown: nil) end + let(:config) { Cadence::Configuration.new } let(:middleware_chain) { instance_double(Cadence::Middleware::Chain) } let(:middleware) { [] } - subject { described_class.new(domain, task_list, lookup, middleware) } + subject { described_class.new(domain, task_list, lookup, config, middleware) } before do - allow(Cadence::Client).to receive(:generate).and_return(client) + allow(Cadence::Connection).to receive(:generate).and_return(connection) allow(Cadence::ThreadPool).to receive(:new).and_return(thread_pool) allow(Cadence::Middleware::Chain).to receive(:new).and_return(middleware_chain) - allow(client).to receive(:poll_for_activity_task).and_return(nil) + allow(connection).to receive(:poll_for_activity_task).and_return(nil) allow(Cadence.metrics).to receive(:timing) end @@ -31,13 +33,13 @@ # stop poller before inspecting subject.stop; subject.wait - expect(client) + expect(connection) .to have_received(:poll_for_activity_task) .with(domain: domain, task_list: task_list) .twice end - it 'polls for activity tasks' do + it 'measures time between polls' do allow(subject).to receive(:shutting_down?).and_return(false, false, true) subject.start @@ -57,20 +59,22 @@ end context 'with options passed' do - subject { described_class.new(domain, task_list, lookup, middleware, options) } + subject { described_class.new(domain, task_list, lookup, config, middleware, options) } let(:options) { { polling_ttl: 42, thread_pool_size: 42 } } before do allow(subject).to receive(:shutting_down?).and_return(false, true) end - it 'passes options to the client' do + it 'passes options to the connection' do subject.start # stop poller before inspecting subject.stop; subject.wait - expect(Cadence::Client).to have_received(:generate).with(options) + expect(Cadence::Connection) + .to have_received(:generate) + .with(config.for_connection, options) end it 'creates thread pool of a specified size' do @@ -89,7 +93,7 @@ before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_for_activity_task).and_return(task) + allow(connection).to receive(:poll_for_activity_task).and_return(task) allow(Cadence::Activity::TaskProcessor).to receive(:new).and_return(task_processor) allow(thread_pool).to receive(:schedule).and_yield end @@ -111,7 +115,7 @@ expect(Cadence::Activity::TaskProcessor) .to have_received(:new) - .with(task, domain, lookup, client, middleware_chain) + .with(task, domain, lookup, middleware_chain, config) expect(task_processor).to have_received(:process) end @@ -134,15 +138,15 @@ def call(_); end expect(Cadence::Middleware::Chain).to have_received(:new).with(middleware) expect(Cadence::Activity::TaskProcessor) .to have_received(:new) - .with(task, domain, lookup, client, middleware_chain) + .with(task, domain, lookup, middleware_chain, config) end end end - context 'when client is unable to poll' do + context 'when connection is unable to poll' do before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_for_activity_task).and_raise(StandardError) + allow(connection).to receive(:poll_for_activity_task).and_raise(StandardError) end it 'logs' do diff --git a/spec/unit/lib/cadence/activity/task_processor_spec.rb b/spec/unit/lib/cadence/activity/task_processor_spec.rb index 67e2a949..c88983bc 100644 --- a/spec/unit/lib/cadence/activity/task_processor_spec.rb +++ b/spec/unit/lib/cadence/activity/task_processor_spec.rb @@ -1,8 +1,9 @@ require 'cadence/activity/task_processor' require 'cadence/middleware/chain' +require 'cadence/configuration' describe Cadence::Activity::TaskProcessor do - subject { described_class.new(task, domain, lookup, client, middleware_chain) } + subject { described_class.new(task, domain, lookup, middleware_chain, config) } let(:domain) { 'test-domain' } let(:lookup) { instance_double('Cadence::ExecutableLookup', find: nil) } @@ -11,22 +12,27 @@ end let(:metadata) { Cadence::Metadata.generate(Cadence::Metadata::ACTIVITY_TYPE, task) } let(:activity_name) { 'TestActivity' } - let(:client) { instance_double('Cadence::Client::ThriftClient') } + let(:connection) { instance_double('Cadence::Connection::Thrift') } let(:middleware_chain) { Cadence::Middleware::Chain.new } + let(:config) { Cadence::Configuration.new } let(:input) { ['arg1', 'arg2'] } describe '#process' do let(:context) { instance_double('Cadence::Activity::Context', async?: false) } before do + allow(Cadence::Connection) + .to receive(:generate) + .with(config.for_connection) + .and_return(connection) allow(Cadence::Metadata) .to receive(:generate) .with(Cadence::Metadata::ACTIVITY_TYPE, task, domain) .and_return(metadata) - allow(Cadence::Activity::Context).to receive(:new).with(client, metadata).and_return(context) + allow(Cadence::Activity::Context).to receive(:new).with(connection, metadata).and_return(context) - allow(client).to receive(:respond_activity_task_completed) - allow(client).to receive(:respond_activity_task_failed) + allow(connection).to receive(:respond_activity_task_completed) + allow(connection).to receive(:respond_activity_task_failed) allow(middleware_chain).to receive(:invoke).and_call_original @@ -37,7 +43,7 @@ it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with( task_token: task.taskToken, @@ -46,8 +52,8 @@ ) end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_activity_task_failed) .and_raise(StandardError) @@ -80,13 +86,13 @@ it 'completes the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_completed) .with(task_token: task.taskToken, result: 'result') end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_activity_task_completed) .and_raise(StandardError) @@ -115,7 +121,7 @@ it 'does not complete the activity task' do subject.process - expect(client).not_to have_received(:respond_activity_task_completed) + expect(connection).not_to have_received(:respond_activity_task_completed) end end end @@ -140,7 +146,7 @@ it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with( task_token: task.taskToken, @@ -149,8 +155,8 @@ ) end - it 'ignores client exception' do - allow(client) + it 'ignores connection exception' do + allow(connection) .to receive(:respond_activity_task_failed) .and_raise(StandardError) @@ -179,7 +185,7 @@ it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with( task_token: task.taskToken, @@ -195,7 +201,7 @@ it 'does not handle the exception' do expect { subject.process }.to raise_error(exception) - expect(client).not_to have_received(:respond_activity_task_failed) + expect(connection).not_to have_received(:respond_activity_task_failed) end end @@ -205,7 +211,7 @@ it 'fails the activity task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed) .with(task_token: task.taskToken, reason: 'StandardError', details: 'activity failed') end diff --git a/spec/unit/lib/cadence/testing_spec.rb b/spec/unit/lib/cadence/testing_spec.rb index ffc122cf..62431061 100644 --- a/spec/unit/lib/cadence/testing_spec.rb +++ b/spec/unit/lib/cadence/testing_spec.rb @@ -11,18 +11,18 @@ def execute; end context 'when testing mode is disabled' do describe 'Cadence.start_workflow' do - let(:client) { instance_double('Cadence::Client::ThriftClient') } + let(:connection) { instance_double('Cadence::Connection::Thrift') } let(:response) { CadenceThrift::StartWorkflowExecutionResponse.new(runId: 'xxx') } - before { allow(Cadence::Client).to receive(:generate).and_return(client) } - after { Cadence.remove_instance_variable(:@client) } + before { allow(Cadence::Connection).to receive(:generate).and_return(connection) } + after { Cadence.remove_instance_variable(:@connection) } it 'invokes original implementation' do - allow(client).to receive(:start_workflow_execution).and_return(response) + allow(connection).to receive(:start_workflow_execution).and_return(response) Cadence.start_workflow(TestCadenceOverrideWorkflow) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with(hash_including(workflow_name: 'TestCadenceOverrideWorkflow')) end diff --git a/spec/unit/lib/cadence/worker_spec.rb b/spec/unit/lib/cadence/worker_spec.rb index 0a98dda9..bcfe7172 100644 --- a/spec/unit/lib/cadence/worker_spec.rb +++ b/spec/unit/lib/cadence/worker_spec.rb @@ -1,8 +1,12 @@ require 'cadence/worker' require 'cadence/workflow' require 'cadence/activity' +require 'cadence/configuration' describe Cadence::Worker do + subject { described_class.new(config) } + let(:config) { Cadence::Configuration.new } + class TestWorkerWorkflow < Cadence::Workflow domain 'default-domain' task_list 'default-task-list' @@ -134,22 +138,22 @@ class TestWorkerActivity < Cadence::Activity allow(Cadence::Workflow::Poller) .to receive(:new) - .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), [], {}) + .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), config, [], {}) .and_return(workflow_poller_1) allow(Cadence::Workflow::Poller) .to receive(:new) - .with('other-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), [], {}) + .with('other-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), config, [], {}) .and_return(workflow_poller_2) allow(Cadence::Activity::Poller) .to receive(:new) - .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), [], {}) + .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), config, [], {}) .and_return(activity_poller_1) allow(Cadence::Activity::Poller) .to receive(:new) - .with('default-domain', 'other-task-list', an_instance_of(Cadence::ExecutableLookup), [], {}) + .with('default-domain', 'other-task-list', an_instance_of(Cadence::ExecutableLookup), config, [], {}) .and_return(activity_poller_2) subject.register_workflow(TestWorkerWorkflow) @@ -166,7 +170,7 @@ class TestWorkerActivity < Cadence::Activity end context 'with options' do - subject { described_class.new(options) } + subject { described_class.new(config, options) } let(:options) { { polling_ttl: 42, thread_pool_size: 42 } } before do @@ -184,6 +188,7 @@ class TestWorkerActivity < Cadence::Activity 'default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), + config, [], options ) @@ -214,12 +219,12 @@ class TestWorkerActivity < Cadence::Activity allow(Cadence::Workflow::Poller) .to receive(:new) - .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), [entry_1], {}) + .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), config, [entry_1], {}) .and_return(workflow_poller_1) allow(Cadence::Activity::Poller) .to receive(:new) - .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), [entry_2], {}) + .with('default-domain', 'default-task-list', an_instance_of(Cadence::ExecutableLookup), config, [entry_2], {}) .and_return(activity_poller_1) subject.register_workflow(TestWorkerWorkflow) diff --git a/spec/unit/lib/cadence/workflow/context_spec.rb b/spec/unit/lib/cadence/workflow/context_spec.rb index 611747c0..7707fcfc 100644 --- a/spec/unit/lib/cadence/workflow/context_spec.rb +++ b/spec/unit/lib/cadence/workflow/context_spec.rb @@ -1,15 +1,17 @@ require 'cadence/testing/local_workflow_context' require 'cadence/workflow/context' require 'cadence/workflow/dispatcher' +require 'cadence/configuration' describe Cadence::Workflow::Context do let(:state_manager) { instance_double('Cadence::Workflow::StateManager') } let(:dispatcher) { Cadence::Workflow::Dispatcher.new } - let(:metadata_hash) do + let(:metadata_hash) do {name: 'TestWorkflow', run_id: SecureRandom.uuid, attempt: 0, timeouts: { execution: 15, task: 10 } } end let(:metadata) { Cadence::Metadata::Workflow.new(metadata_hash) } - let(:context) { described_class.new(state_manager, dispatcher, metadata) } + let(:config) { Cadence::Configuration.new } + let(:context) { described_class.new(state_manager, dispatcher, metadata, config) } describe '.sleep_until' do let(:start_time) { Time.now} diff --git a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb index 16715abf..7331a9ab 100644 --- a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb +++ b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb @@ -1,28 +1,34 @@ require 'cadence/workflow/decision_task_processor' require 'cadence/workflow' require 'cadence/executable_lookup' -require 'cadence/client/thrift_client' +require 'cadence/connection/thrift' require 'cadence/middleware/chain' +require 'cadence/configuration' describe Cadence::Workflow::DecisionTaskProcessor do class TestWorkflow < Cadence::Workflow; end - subject { described_class.new(task, domain, lookup, client, middleware_chain) } + subject { described_class.new(task, domain, lookup, middleware_chain, config) } let(:task) { Fabricate(:decision_task_thrift) } let(:domain) { 'test-domain' } let(:lookup) { Cadence::ExecutableLookup.new } - let(:client) do + let(:connection) do instance_double( - Cadence::Client::ThriftClient, + Cadence::Connection::Thrift, respond_decision_task_completed: nil, respond_decision_task_failed: nil ) end let(:middleware_chain) { Cadence::Middleware::Chain.new } let(:executor) { instance_double(Cadence::Workflow::Executor, run: []) } + let(:config) { Cadence::Configuration.new } before do + allow(Cadence::Connection) + .to receive(:generate) + .with(config.for_connection) + .and_return(connection) allow(Cadence.metrics).to receive(:timing) allow(Cadence.logger).to receive(:info) allow(Cadence.logger).to receive(:error) @@ -35,7 +41,7 @@ class TestWorkflow < Cadence::Workflow; end allow(Cadence::Workflow::Executor) .to receive(:new) - .with(TestWorkflow, an_instance_of(Cadence::Workflow::History)) + .with(TestWorkflow, an_instance_of(Cadence::Workflow::History), config) .and_return(executor) end @@ -61,7 +67,7 @@ class TestWorkflow < Cadence::Workflow; end it 'completes the decision task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_decision_task_completed) .with(task_token: task.taskToken, decisions: []) end @@ -84,7 +90,7 @@ class TestWorkflow < Cadence::Workflow; end let(:history_2) { Fabricate(:worklfow_execution_history_thrift, nextPageToken: nil) } before do - allow(client) + allow(connection) .to receive(:get_workflow_execution_history) .and_return(history_1, history_2) end @@ -92,7 +98,7 @@ class TestWorkflow < Cadence::Workflow; end it 'fetches missing history pages' do subject.process - expect(client) + expect(connection) .to have_received(:get_workflow_execution_history) .with( domain: domain, @@ -101,7 +107,7 @@ class TestWorkflow < Cadence::Workflow; end next_page_token: task.nextPageToken ) - expect(client) + expect(connection) .to have_received(:get_workflow_execution_history) .with( domain: domain, @@ -123,7 +129,7 @@ class TestWorkflow < Cadence::Workflow; end context 'when unable to complete a workflow' do before do - allow(client) + allow(connection) .to receive(:respond_decision_task_completed) .and_raise(StandardError, 'Host unreachable') end @@ -146,7 +152,7 @@ class TestWorkflow < Cadence::Workflow; end it 'fails the decision task' do subject.process - expect(client) + expect(connection) .to have_received(:respond_decision_task_failed) .with( task_token: task.taskToken, @@ -176,7 +182,7 @@ class TestWorkflow < Cadence::Workflow; end subject.process - expect(client).not_to have_received(:respond_decision_task_failed) + expect(connection).not_to have_received(:respond_decision_task_failed) end end end diff --git a/spec/unit/lib/cadence/workflow/poller_spec.rb b/spec/unit/lib/cadence/workflow/poller_spec.rb index 097ab487..1acadaa9 100644 --- a/spec/unit/lib/cadence/workflow/poller_spec.rb +++ b/spec/unit/lib/cadence/workflow/poller_spec.rb @@ -1,24 +1,26 @@ require 'cadence/workflow/poller' require 'cadence/middleware/entry' +require 'cadence/configuration' describe Cadence::Workflow::Poller do - let(:client) { instance_double('Cadence::Client::ThriftClient') } + let(:connection) { instance_double('Cadence::Connection::Thrift') } let(:domain) { 'test-domain' } let(:task_list) { 'test-task-list' } let(:lookup) { instance_double('Cadence::ExecutableLookup') } let(:thread_pool) do instance_double(Cadence::ThreadPool, wait_for_available_threads: nil, shutdown: nil) end + let(:config) { Cadence::Configuration.new } let(:middleware_chain) { instance_double(Cadence::Middleware::Chain) } let(:middleware) { [] } - subject { described_class.new(domain, task_list, lookup, middleware) } + subject { described_class.new(domain, task_list, lookup, config, middleware) } before do - allow(Cadence::Client).to receive(:generate).and_return(client) + allow(Cadence::Connection).to receive(:generate).and_return(connection) allow(Cadence::ThreadPool).to receive(:new).and_return(thread_pool) allow(Cadence::Middleware::Chain).to receive(:new).and_return(middleware_chain) - allow(client).to receive(:poll_for_decision_task).and_return(nil) + allow(connection).to receive(:poll_for_decision_task).and_return(nil) allow(Cadence.metrics).to receive(:timing) end @@ -31,7 +33,7 @@ # stop poller before inspecting subject.stop; subject.wait - expect(client) + expect(connection) .to have_received(:poll_for_decision_task) .with(domain: domain, task_list: task_list) .twice @@ -57,20 +59,22 @@ end context 'with options passed' do - subject { described_class.new(domain, task_list, lookup, middleware, options) } + subject { described_class.new(domain, task_list, lookup, config, middleware, options) } let(:options) { { polling_ttl: 42, thread_pool_size: 42 } } before do allow(subject).to receive(:shutting_down?).and_return(false, true) end - it 'passes options to the client' do + it 'passes options to the connection' do subject.start # stop poller before inspecting subject.stop; subject.wait - expect(Cadence::Client).to have_received(:generate).with(options) + expect(Cadence::Connection) + .to have_received(:generate) + .with(config.for_connection, options) end it 'creates thread pool of a specified size' do @@ -91,7 +95,7 @@ before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_for_decision_task).and_return(task) + allow(connection).to receive(:poll_for_decision_task).and_return(task) allow(Cadence::Workflow::DecisionTaskProcessor).to receive(:new).and_return(task_processor) allow(thread_pool).to receive(:schedule).and_yield end @@ -113,7 +117,7 @@ expect(Cadence::Workflow::DecisionTaskProcessor) .to have_received(:new) - .with(task, domain, lookup, client, middleware_chain) + .with(task, domain, lookup, middleware_chain, config) expect(task_processor).to have_received(:process) end @@ -136,15 +140,15 @@ def call(_); end expect(Cadence::Middleware::Chain).to have_received(:new).with(middleware) expect(Cadence::Workflow::DecisionTaskProcessor) .to have_received(:new) - .with(task, domain, lookup, client, middleware_chain) + .with(task, domain, lookup, middleware_chain, config) end end end - context 'when client is unable to poll' do + context 'when connection is unable to poll' do before do allow(subject).to receive(:shutting_down?).and_return(false, true) - allow(client).to receive(:poll_for_decision_task).and_raise(StandardError) + allow(connection).to receive(:poll_for_decision_task).and_raise(StandardError) end it 'logs' do diff --git a/spec/unit/lib/cadence_spec.rb b/spec/unit/lib/cadence_spec.rb index dd03b2f8..bcf190f8 100644 --- a/spec/unit/lib/cadence_spec.rb +++ b/spec/unit/lib/cadence_spec.rb @@ -1,19 +1,20 @@ require 'cadence' require 'cadence/workflow' +require 'cadence/connection/thrift' describe Cadence do describe 'client operations' do - let(:client) { instance_double(Cadence::Client::ThriftClient) } + let(:connection) { instance_double(Cadence::Connection::Thrift) } - before { allow(Cadence::Client).to receive(:generate).and_return(client) } - after { described_class.remove_instance_variable(:@client) } + before { allow(Cadence::Connection).to receive(:generate).and_return(connection) } + after { described_class.remove_instance_variable(:@connection) } describe '.start_workflow' do let(:cadence_response) do CadenceThrift::StartWorkflowExecutionResponse.new(runId: 'xxx') end - before { allow(client).to receive(:start_workflow_execution).and_return(cadence_response) } + before { allow(connection).to receive(:start_workflow_execution).and_return(cadence_response) } context 'using a workflow class' do class TestStartWorkflow < Cadence::Workflow @@ -30,7 +31,7 @@ class TestStartWorkflow < Cadence::Workflow it 'starts a workflow using the default options' do described_class.start_workflow(TestStartWorkflow, 42) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( domain: 'default-test-domain', @@ -57,7 +58,7 @@ class TestStartWorkflow < Cadence::Workflow } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( domain: 'test-domain', @@ -86,7 +87,7 @@ class TestStartWorkflow < Cadence::Workflow } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( domain: 'test-domain', @@ -111,7 +112,7 @@ class TestStartWorkflow < Cadence::Workflow options: { name: 'test-workflow' } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( domain: 'default-test-domain', @@ -129,7 +130,7 @@ class TestStartWorkflow < Cadence::Workflow it 'starts a workflow using specified workflow_id' do described_class.start_workflow(TestStartWorkflow, 42, options: { workflow_id: '123' }) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( domain: 'default-test-domain', @@ -149,7 +150,7 @@ class TestStartWorkflow < Cadence::Workflow TestStartWorkflow, 42, options: { workflow_id_reuse_policy: :allow } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( domain: 'default-test-domain', @@ -173,7 +174,7 @@ class TestStartWorkflow < Cadence::Workflow options: { domain: 'test-domain', task_list: 'test-task-list' } ) - expect(client) + expect(connection) .to have_received(:start_workflow_execution) .with( domain: 'test-domain', @@ -191,12 +192,12 @@ class TestStartWorkflow < Cadence::Workflow end describe '.register_domain' do - before { allow(client).to receive(:register_domain).and_return(nil) } + before { allow(connection).to receive(:register_domain).and_return(nil) } it 'registers domain with the specified name' do described_class.register_domain('new-domain') - expect(client) + expect(connection) .to have_received(:register_domain) .with(name: 'new-domain', description: nil) end @@ -204,14 +205,14 @@ class TestStartWorkflow < Cadence::Workflow it 'registers domain with the specified name and description' do described_class.register_domain('new-domain', 'domain description') - expect(client) + expect(connection) .to have_received(:register_domain) .with(name: 'new-domain', description: 'domain description') end context 'when domain is already registered' do before do - allow(client) + allow(connection) .to receive(:register_domain) .and_raise(CadenceThrift::DomainAlreadyExistsError) end @@ -225,12 +226,12 @@ class TestStartWorkflow < Cadence::Workflow end describe '.terminate_workflow' do - before { allow(client).to receive(:terminate_workflow_execution).and_return(nil) } + before { allow(connection).to receive(:terminate_workflow_execution).and_return(nil) } it 'terminates workflow execution' do described_class.terminate_workflow('test-domain', 'xxx', 'yyy') - expect(client) + expect(connection) .to have_received(:terminate_workflow_execution) .with( domain: 'test-domain', @@ -250,7 +251,7 @@ class TestStartWorkflow < Cadence::Workflow details: '{ "foo": "bar" }' ) - expect(client) + expect(connection) .to have_received(:terminate_workflow_execution) .with( domain: 'test-domain', @@ -271,12 +272,12 @@ class TestStartWorkflow < Cadence::Workflow end let(:info_thrift) { Fabricate(:workflow_execution_info_thrift) } - before { allow(client).to receive(:describe_workflow_execution).and_return(response) } + before { allow(connection).to receive(:describe_workflow_execution).and_return(response) } it 'requests execution info from Cadence' do described_class.fetch_workflow_execution_info('domain', '111', '222') - expect(client) + expect(connection) .to have_received(:describe_workflow_execution) .with(domain: 'domain', workflow_id: '111', run_id: '222') end @@ -296,16 +297,16 @@ class TestStartWorkflow < Cadence::Workflow before do allow(history_mock).to receive(:events).and_return([event_mock]) allow(response_mock).to receive(:history).and_return(history_mock) - allow(client).to receive(:get_workflow_execution_history).and_return(response_mock) + allow(connection).to receive(:get_workflow_execution_history).and_return(response_mock) end - it 'wraps client get_workflow_execution_history' do + it 'wraps connection get_workflow_execution_history' do described_class.get_workflow_history( domain:'default-test-domain', workflow_id: '123', run_id: '1234' ) - expect(client).to have_received(:get_workflow_execution_history).with( + expect(connection).to have_received(:get_workflow_execution_history).with( domain: 'default-test-domain', workflow_id: '123', run_id: '1234' @@ -316,12 +317,12 @@ class TestStartWorkflow < Cadence::Workflow describe '.reset_workflow' do let(:cadence_response) { CadenceThrift::StartWorkflowExecutionResponse.new(runId: 'xxx') } - before { allow(client).to receive(:reset_workflow_execution).and_return(cadence_response) } + before { allow(connection).to receive(:reset_workflow_execution).and_return(cadence_response) } context 'when decision_task_id is provided' do let(:decision_task_id) { 42 } - it 'calls client reset_workflow_execution' do + it 'calls connection reset_workflow_execution' do described_class.reset_workflow( 'default-test-domain', '123', @@ -330,7 +331,7 @@ class TestStartWorkflow < Cadence::Workflow reason: 'Test reset' ) - expect(client).to have_received(:reset_workflow_execution).with( + expect(connection).to have_received(:reset_workflow_execution).with( domain: 'default-test-domain', workflow_id: '123', run_id: '1234', @@ -362,12 +363,12 @@ class TestStartWorkflow < Cadence::Workflow end describe '.complete_activity' do - before { allow(client).to receive(:respond_activity_task_completed_by_id).and_return(nil) } + before { allow(connection).to receive(:respond_activity_task_completed_by_id).and_return(nil) } it 'completes activity with a result' do described_class.complete_activity(async_token, 'all work completed') - expect(client) + expect(connection) .to have_received(:respond_activity_task_completed_by_id) .with( domain: domain, @@ -381,7 +382,7 @@ class TestStartWorkflow < Cadence::Workflow it 'completes activity without a result' do described_class.complete_activity(async_token) - expect(client) + expect(connection) .to have_received(:respond_activity_task_completed_by_id) .with( domain: domain, @@ -394,12 +395,12 @@ class TestStartWorkflow < Cadence::Workflow end describe '.fail_activity' do - before { allow(client).to receive(:respond_activity_task_failed_by_id).and_return(nil) } + before { allow(connection).to receive(:respond_activity_task_failed_by_id).and_return(nil) } it 'fails activity with a provided error' do described_class.fail_activity(async_token, StandardError.new('something went wrong')) - expect(client) + expect(connection) .to have_received(:respond_activity_task_failed_by_id) .with( domain: domain,