diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index 40259f16..29c0977d 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -108,7 +108,7 @@ def poll_for_task def process(task) middleware_chain = Middleware::Chain.new(middleware) - TaskProcessor.new(task, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool).process + TaskProcessor.new(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool).process end def poll_retry_seconds diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 51ae5408..35f4bbc6 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -12,8 +12,9 @@ class Activity class TaskProcessor include Concerns::Payloads - def initialize(task, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool) + def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool) @task = task + @task_queue = task_queue @namespace = namespace @metadata = Metadata.generate_activity_metadata(task, namespace) @task_token = task.task_token @@ -28,7 +29,7 @@ def process start_time = Time.now Temporal.logger.debug("Processing Activity task", metadata.to_h) - Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, queue_time_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name) + Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, queue_time_ms, metric_tags) context = Activity::Context.new(connection, metadata, config, heartbeat_thread_pool) @@ -52,13 +53,22 @@ def process end time_diff_ms = ((Time.now - start_time) * 1000).round - Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, time_diff_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name) + Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, time_diff_ms, metric_tags) Temporal.logger.debug("Activity task processed", metadata.to_h.merge(execution_time: time_diff_ms)) end + def metric_tags + { + activity: activity_name, + namespace: namespace, + task_queue: task_queue, + workflow: metadata.workflow_name + } + end + private - attr_reader :task, :namespace, :task_token, :activity_name, :activity_class, + attr_reader :task, :task_queue, :namespace, :task_token, :activity_name, :activity_class, :middleware_chain, :metadata, :config, :heartbeat_thread_pool def connection diff --git a/lib/temporal/workflow/poller.rb b/lib/temporal/workflow/poller.rb index 89fed958..198f4502 100644 --- a/lib/temporal/workflow/poller.rb +++ b/lib/temporal/workflow/poller.rb @@ -113,8 +113,8 @@ def process(task) middleware_chain = Middleware::Chain.new(middleware) workflow_middleware_chain = Middleware::Chain.new(workflow_middleware) - TaskProcessor.new(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, - binary_checksum).process + TaskProcessor.new(task, task_queue, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, + config, binary_checksum).process end def thread_pool diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index 9b79b454..f415aed5 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -24,8 +24,9 @@ def query_args MAX_FAILED_ATTEMPTS = 1 LEGACY_QUERY_KEY = :legacy_query - def initialize(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) + def initialize(task, task_queue, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) @task = task + @task_queue = task_queue @namespace = namespace @metadata = Metadata.generate_workflow_task_metadata(task, namespace) @task_token = task.task_token @@ -40,9 +41,8 @@ def initialize(task, namespace, workflow_lookup, middleware_chain, workflow_midd def process start_time = Time.now - Temporal.logger.debug('Processing Workflow task', metadata.to_h) - Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, workflow: workflow_name, - namespace: namespace) + Temporal.logger.debug("Processing Workflow task", metadata.to_h) + Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, metric_tags) raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker' unless workflow_class @@ -73,14 +73,21 @@ def process fail_task(e) ensure time_diff_ms = ((Time.now - start_time) * 1000).round - Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, workflow: workflow_name, - namespace: namespace) + Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, metric_tags) Temporal.logger.debug('Workflow task processed', metadata.to_h.merge(execution_time: time_diff_ms)) end + def metric_tags + { + workflow: workflow_name, + namespace: namespace, + task_queue: task_queue + } + end + private - attr_reader :task, :namespace, :task_token, :workflow_name, :workflow_class, + attr_reader :task, :task_queue, :namespace, :task_token, :workflow_name, :workflow_class, :middleware_chain, :workflow_middleware_chain, :metadata, :config, :binary_checksum def connection @@ -154,8 +161,7 @@ def complete_query(result) end def fail_task(error) - Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, workflow: workflow_name, - namespace: namespace) + Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, metric_tags) Temporal.logger.error('Workflow task failed', metadata.to_h.merge(error: error.inspect)) Temporal.logger.debug(error.backtrace.join("\n")) diff --git a/spec/unit/lib/temporal/activity/poller_spec.rb b/spec/unit/lib/temporal/activity/poller_spec.rb index 0476e950..76d8396a 100644 --- a/spec/unit/lib/temporal/activity/poller_spec.rb +++ b/spec/unit/lib/temporal/activity/poller_spec.rb @@ -108,7 +108,7 @@ def poll(task, times: 1) expect(Temporal::Activity::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) + .with(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) expect(task_processor).to have_received(:process) end @@ -143,7 +143,7 @@ def call(_); end expect(Temporal::Middleware::Chain).to have_received(:new).with(middleware) expect(Temporal::Activity::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) + .with(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) end end end diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 41ea952f..e4ccdb2a 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -5,9 +5,10 @@ require 'temporal/scheduled_thread_pool' describe Temporal::Activity::TaskProcessor do - subject { described_class.new(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) } + subject { described_class.new(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) } let(:namespace) { 'test-namespace' } + let(:task_queue) { 'test-queue' } let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } let(:task) do Fabricate( @@ -149,9 +150,11 @@ .with( Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, an_instance_of(Integer), - activity: activity_name, - namespace: namespace, - workflow: workflow_name + hash_including({ + activity: activity_name, + namespace: namespace, + workflow: workflow_name + }) ) end @@ -165,6 +168,7 @@ an_instance_of(Integer), activity: activity_name, namespace: namespace, + task_queue: task_queue, workflow: workflow_name ) end @@ -240,9 +244,11 @@ .with( Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, an_instance_of(Integer), - activity: activity_name, - namespace: namespace, - workflow: workflow_name + hash_including({ + activity: activity_name, + namespace: namespace, + workflow: workflow_name + }) ) end @@ -256,6 +262,7 @@ an_instance_of(Integer), activity: activity_name, namespace: namespace, + task_queue: task_queue, workflow: workflow_name ) end diff --git a/spec/unit/lib/temporal/workflow/poller_spec.rb b/spec/unit/lib/temporal/workflow/poller_spec.rb index e8d5692b..020e2e91 100644 --- a/spec/unit/lib/temporal/workflow/poller_spec.rb +++ b/spec/unit/lib/temporal/workflow/poller_spec.rb @@ -113,7 +113,7 @@ def poll(task, times: 1) expect(Temporal::Workflow::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, empty_middleware_chain, empty_middleware_chain, config, binary_checksum) + .with(task, task_queue, namespace, lookup, empty_middleware_chain, empty_middleware_chain, config, binary_checksum) expect(task_processor).to have_received(:process) end @@ -151,7 +151,7 @@ def call(_); end expect(Temporal::Middleware::Chain).to have_received(:new).with(workflow_middleware) expect(Temporal::Workflow::TaskProcessor) .to have_received(:new) - .with(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) + .with(task, task_queue, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) end end end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 33d5506f..6ad3c12c 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -5,10 +5,11 @@ describe Temporal::Workflow::TaskProcessor do subject do - described_class.new(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) + described_class.new(task, task_queue, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) end let(:namespace) { 'test-namespace' } + let(:task_queue) { 'test-queue' } let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } let(:query) { nil } let(:queries) { nil } @@ -73,8 +74,10 @@ .to have_received(:increment) .with( Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, - workflow: workflow_name, - namespace: namespace + hash_including({ + workflow: workflow_name, + namespace: namespace + }) ) end end @@ -203,8 +206,10 @@ .with( Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, an_instance_of(Integer), - workflow: workflow_name, - namespace: namespace + hash_including({ + workflow: workflow_name, + namespace: namespace + }) ) end @@ -216,8 +221,10 @@ .with( Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, an_instance_of(Integer), - workflow: workflow_name, - namespace: namespace + hash_including({ + workflow: workflow_name, + namespace: namespace + }) ) end end @@ -251,8 +258,10 @@ .to have_received(:increment) .with( Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, - workflow: workflow_name, - namespace: namespace + hash_including({ + workflow: workflow_name, + namespace: namespace + }) ) end end @@ -312,8 +321,10 @@ .with( Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, an_instance_of(Integer), - workflow: workflow_name, - namespace: namespace + hash_including({ + workflow: workflow_name, + namespace: namespace + }) ) end @@ -325,8 +336,10 @@ .with( Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, an_instance_of(Integer), - workflow: workflow_name, - namespace: namespace + hash_including({ + workflow: workflow_name, + namespace: namespace + }) ) end end