Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions lib/cadence.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'securerandom'
require 'cadence/configuration'
require 'cadence/execution_options'
require 'cadence/client'
require 'cadence/connection'
require 'cadence/activity'
require 'cadence/activity/async_token'
require 'cadence/workflow'
Expand All @@ -15,10 +15,10 @@ def start_workflow(workflow, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

execution_options = ExecutionOptions.new(workflow, options)
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
workflow_id = options[:workflow_id] || SecureRandom.uuid

response = client.start_workflow_execution(
response = connection.start_workflow_execution(
domain: execution_options.domain,
workflow_id: workflow_id,
workflow_name: execution_options.name,
Expand All @@ -37,10 +37,10 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

execution_options = ExecutionOptions.new(workflow, options)
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
workflow_id = options[:workflow_id] || SecureRandom.uuid

response = client.start_workflow_execution(
response = connection.start_workflow_execution(
domain: execution_options.domain,
workflow_id: workflow_id,
workflow_name: execution_options.name,
Expand All @@ -57,13 +57,13 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
end

def register_domain(name, description = nil)
client.register_domain(name: name, description: description)
connection.register_domain(name: name, description: description)
rescue CadenceThrift::DomainAlreadyExistsError
nil
end

def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
client.signal_workflow_execution(
connection.signal_workflow_execution(
domain: workflow.domain, # TODO: allow passing domain instead
workflow_id: workflow_id,
run_id: run_id,
Expand All @@ -76,7 +76,7 @@ def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: '
decision_task_id ||= get_last_completed_decision_task(domain, workflow_id, run_id)
raise Error, 'Could not find a completed decision task event' unless decision_task_id

response = client.reset_workflow_execution(
response = connection.reset_workflow_execution(
domain: domain,
workflow_id: workflow_id,
run_id: run_id,
Expand All @@ -88,7 +88,7 @@ def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: '
end

def terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination', details: nil)
client.terminate_workflow_execution(
connection.terminate_workflow_execution(
domain: domain,
workflow_id: workflow_id,
run_id: run_id,
Expand All @@ -98,7 +98,7 @@ def terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination'
end

def fetch_workflow_execution_info(domain, workflow_id, run_id)
response = client.describe_workflow_execution(
response = connection.describe_workflow_execution(
domain: domain,
workflow_id: workflow_id,
run_id: run_id
Expand All @@ -110,7 +110,7 @@ def fetch_workflow_execution_info(domain, workflow_id, run_id)
def complete_activity(async_token, result = nil)
details = Activity::AsyncToken.decode(async_token)

client.respond_activity_task_completed_by_id(
connection.respond_activity_task_completed_by_id(
domain: details.domain,
activity_id: details.activity_id,
workflow_id: details.workflow_id,
Expand All @@ -122,7 +122,7 @@ def complete_activity(async_token, result = nil)
def fail_activity(async_token, error)
details = Activity::AsyncToken.decode(async_token)

client.respond_activity_task_failed_by_id(
connection.respond_activity_task_failed_by_id(
domain: details.domain,
activity_id: details.activity_id,
workflow_id: details.workflow_id,
Expand All @@ -133,23 +133,24 @@ def fail_activity(async_token, error)
end

def configure(&block)
yield configuration
yield config
end

def configuration
@configuration ||= Configuration.new
warn '[DEPRECATION] This method is now deprecated without a substitution'
config
end

def logger
configuration.logger
config.logger
end

def metrics
@metrics ||= Metrics.new(configuration.metrics_adapter)
@metrics ||= Metrics.new(config.metrics_adapter)
end

def get_workflow_history(domain:, workflow_id:, run_id:)
history_response = client.get_workflow_execution_history(
history_response = connection.get_workflow_execution_history(
domain: domain,
workflow_id: workflow_id,
run_id: run_id
Expand All @@ -159,8 +160,12 @@ def get_workflow_history(domain:, workflow_id:, run_id:)

private

def client
@client ||= Cadence::Client.generate
def config
@config ||= Configuration.new
end

def connection
@connection ||= Cadence::Connection.generate(config.for_connection)
end

def get_last_completed_decision_task(domain, workflow_id, run_id)
Expand Down
8 changes: 4 additions & 4 deletions lib/cadence/activity/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
module Cadence
class Activity
class Context
def initialize(client, metadata)
@client = client
def initialize(connection, metadata)
@connection = connection
@metadata = metadata
@async = false
end
Expand All @@ -32,7 +32,7 @@ def async_token

def heartbeat(details = nil)
logger.debug('Activity heartbeat')
client.record_activity_task_heartbeat(task_token: task_token, details: details)
connection.record_activity_task_heartbeat(task_token: task_token, details: details)
end

def logger
Expand All @@ -54,7 +54,7 @@ def headers

private

attr_reader :client, :metadata
attr_reader :connection, :metadata

def task_token
metadata.task_token
Expand Down
16 changes: 8 additions & 8 deletions lib/cadence/activity/poller.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'cadence/client'
require 'cadence/connection'
require 'cadence/thread_pool'
require 'cadence/middleware/chain'
require 'cadence/activity/task_processor'
Expand All @@ -10,10 +10,11 @@ class Poller
thread_pool_size: 20
}.freeze

def initialize(domain, task_list, activity_lookup, middleware = [], options = {})
def initialize(domain, task_list, activity_lookup, config, middleware = [], options = {})
@domain = domain
@task_list = task_list
@activity_lookup = activity_lookup
@config = config
@middleware = middleware
@options = DEFAULT_OPTIONS.merge(options)
@shutting_down = false
Expand All @@ -36,10 +37,10 @@ def wait

private

attr_reader :domain, :task_list, :activity_lookup, :middleware, :options, :thread
attr_reader :domain, :task_list, :activity_lookup, :config, :middleware, :options, :thread

def client
@client ||= Cadence::Client.generate(options)
def connection
@connection ||= Cadence::Connection.generate(config.for_connection, options)
end

def shutting_down?
Expand Down Expand Up @@ -68,17 +69,16 @@ def poll_loop
end

def poll_for_task
client.poll_for_activity_task(domain: domain, task_list: task_list)
connection.poll_for_activity_task(domain: domain, task_list: task_list)
rescue StandardError => error
Cadence.logger.error("Unable to poll for an activity task: #{error.inspect}")
nil
end

def process(task)
client = Cadence::Client.generate
middleware_chain = Middleware::Chain.new(middleware)

TaskProcessor.new(task, domain, activity_lookup, client, middleware_chain).process
TaskProcessor.new(task, domain, activity_lookup, middleware_chain, config).process
end

def thread_pool
Expand Down
18 changes: 12 additions & 6 deletions lib/cadence/activity/task_processor.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
require 'cadence/metadata'
require 'cadence/activity/context'
require 'cadence/json'
require 'cadence/connection'

module Cadence
class Activity
class TaskProcessor
def initialize(task, domain, activity_lookup, client, middleware_chain)
def initialize(task, domain, activity_lookup, middleware_chain, config)
@task = task
@domain = domain
@task_token = task.taskToken
@activity_name = task.activityType.name
@activity_class = activity_lookup.find(activity_name)
@client = client
@middleware_chain = middleware_chain
@config = config
end

def process
Expand All @@ -27,7 +28,7 @@ def process
end

metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, domain)
context = Activity::Context.new(client, metadata)
context = Activity::Context.new(connection, metadata)

result = middleware_chain.invoke(metadata) do
activity_class.execute_in_context(context, JSON.deserialize(task.input))
Expand All @@ -49,22 +50,27 @@ def process

private

attr_reader :task, :domain, :task_token, :activity_name, :activity_class, :client, :middleware_chain
attr_reader :task, :domain, :task_token, :activity_name, :activity_class,
:middleware_chain, :config

def connection
@connection ||= Cadence::Connection.generate(config.for_connection)
end

def queue_time_ms
((task.startedTimestamp - task.scheduledTimestampOfThisAttempt) / 1_000_000).round
end

def respond_completed(result)
Cadence.logger.info("Activity #{activity_name} completed")
client.respond_activity_task_completed(task_token: task_token, result: result)
connection.respond_activity_task_completed(task_token: task_token, result: result)
rescue StandardError => error
Cadence.logger.error("Unable to complete Activity #{activity_name}: #{error.inspect}")
end

def respond_failed(reason, details)
Cadence.logger.error("Activity #{activity_name} failed with: #{reason}")
client.respond_activity_task_failed(task_token: task_token, reason: reason, details: details)
connection.respond_activity_task_failed(task_token: task_token, reason: reason, details: details)
rescue StandardError => error
Cadence.logger.error("Unable to fail Activity #{activity_name}: #{error.inspect}")
end
Expand Down
21 changes: 0 additions & 21 deletions lib/cadence/client.rb

This file was deleted.

24 changes: 22 additions & 2 deletions lib/cadence/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

module Cadence
class Configuration
Connection = Struct.new(:type, :host, :port, keyword_init: true)
Execution = Struct.new(:domain, :task_list, :timeouts, :headers, keyword_init: true)

attr_reader :timeouts
attr_accessor :client_type, :host, :port, :logger, :metrics_adapter, :domain, :task_list, :headers
attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :domain, :task_list, :headers

DEFAULT_TIMEOUTS = {
execution: 60, # End-to-end workflow time
Expand All @@ -20,7 +23,7 @@ class Configuration
DEFAULT_TASK_LIST = 'default-task-list'.freeze

def initialize
@client_type = :thrift
@connection_type = :thrift
@logger = Logger.new(STDOUT, progname: 'cadence_client')
@metrics_adapter = MetricsAdapters::Null.new
@timeouts = DEFAULT_TIMEOUTS
Expand All @@ -32,5 +35,22 @@ def initialize
def timeouts=(new_timeouts)
@timeouts = DEFAULT_TIMEOUTS.merge(new_timeouts)
end

def for_connection
Connection.new(
type: connection_type,
host: host,
port: port
).freeze
end

def default_execution_options
Execution.new(
domain: domain,
task_list: task_list,
timeouts: timeouts,
headers: headers
).freeze
end
end
end
21 changes: 21 additions & 0 deletions lib/cadence/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require 'cadence/connection/thrift'

module Cadence
module Connection
CLIENT_TYPES_MAP = {
thrift: Cadence::Connection::Thrift
}.freeze

def self.generate(configuration, options = {})
connection_class = CLIENT_TYPES_MAP[configuration.type]
host = configuration.host
port = configuration.port

hostname = `hostname`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Socket.gethostname avoids invoking another program

Copy link
Contributor Author

@antstorm antstorm Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice, didn't know of this one. I'll add it as a follow-up PR (we had hostname for a while and I want to keep changes related to config in this PR)

thread_id = Thread.current.object_id
identity = "#{thread_id}@#{hostname}"

connection_class.new(host, port, identity, options)
end
end
end
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module Cadence
module Client
module Connection
class Error < StandardError; end

# incorrect arguments passed to the client
# incorrect arguments passed to the connection
class ArgumentError < Error; end
end
end
Loading