Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def poll_for_task
def process(task)
middleware_chain = Middleware::Chain.new(middleware)

TaskProcessor.new(task, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool).process
TaskProcessor.new(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool).process
end

def poll_retry_seconds
Expand Down
18 changes: 14 additions & 4 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ class Activity
class TaskProcessor
include Concerns::Payloads

def initialize(task, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool)
def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool)
@task = task
@task_queue = task_queue
@namespace = namespace
@metadata = Metadata.generate_activity_metadata(task, namespace)
@task_token = task.task_token
Expand All @@ -28,7 +29,7 @@ def process
start_time = Time.now

Temporal.logger.debug("Processing Activity task", metadata.to_h)
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, queue_time_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name)
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, queue_time_ms, metric_tags)

context = Activity::Context.new(connection, metadata, config, heartbeat_thread_pool)

Expand All @@ -52,13 +53,22 @@ def process
end

time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, time_diff_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name)
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, time_diff_ms, metric_tags)
Temporal.logger.debug("Activity task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end

def metric_tags
{
activity: activity_name,
namespace: namespace,
task_queue: task_queue,
workflow: metadata.workflow_name
}
end

private

attr_reader :task, :namespace, :task_token, :activity_name, :activity_class,
attr_reader :task, :task_queue, :namespace, :task_token, :activity_name, :activity_class,
:middleware_chain, :metadata, :config, :heartbeat_thread_pool

def connection
Expand Down
4 changes: 2 additions & 2 deletions lib/temporal/workflow/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ def process(task)
middleware_chain = Middleware::Chain.new(middleware)
workflow_middleware_chain = Middleware::Chain.new(workflow_middleware)

TaskProcessor.new(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config,
binary_checksum).process
TaskProcessor.new(task, task_queue, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain,
config, binary_checksum).process
end

def thread_pool
Expand Down
24 changes: 15 additions & 9 deletions lib/temporal/workflow/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def query_args
MAX_FAILED_ATTEMPTS = 1
LEGACY_QUERY_KEY = :legacy_query

def initialize(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
def initialize(task, task_queue, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
@task = task
@task_queue = task_queue
@namespace = namespace
@metadata = Metadata.generate_workflow_task_metadata(task, namespace)
@task_token = task.task_token
Expand All @@ -40,9 +41,8 @@ def initialize(task, namespace, workflow_lookup, middleware_chain, workflow_midd
def process
start_time = Time.now

Temporal.logger.debug('Processing Workflow task', metadata.to_h)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, workflow: workflow_name,
namespace: namespace)
Temporal.logger.debug("Processing Workflow task", metadata.to_h)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, metric_tags)

raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker' unless workflow_class

Expand Down Expand Up @@ -73,14 +73,21 @@ def process
fail_task(e)
ensure
time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, workflow: workflow_name,
namespace: namespace)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, metric_tags)
Temporal.logger.debug('Workflow task processed', metadata.to_h.merge(execution_time: time_diff_ms))
end

def metric_tags
{
workflow: workflow_name,
namespace: namespace,
task_queue: task_queue
}
end

private

attr_reader :task, :namespace, :task_token, :workflow_name, :workflow_class,
attr_reader :task, :task_queue, :namespace, :task_token, :workflow_name, :workflow_class,
:middleware_chain, :workflow_middleware_chain, :metadata, :config, :binary_checksum

def connection
Expand Down Expand Up @@ -154,8 +161,7 @@ def complete_query(result)
end

def fail_task(error)
Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, workflow: workflow_name,
namespace: namespace)
Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, metric_tags)
Temporal.logger.error('Workflow task failed', metadata.to_h.merge(error: error.inspect))
Temporal.logger.debug(error.backtrace.join("\n"))

Expand Down
4 changes: 2 additions & 2 deletions spec/unit/lib/temporal/activity/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def poll(task, times: 1)

expect(Temporal::Activity::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
.with(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
expect(task_processor).to have_received(:process)
end

Expand Down Expand Up @@ -143,7 +143,7 @@ def call(_); end
expect(Temporal::Middleware::Chain).to have_received(:new).with(middleware)
expect(Temporal::Activity::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
.with(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
end
end
end
Expand Down
21 changes: 14 additions & 7 deletions spec/unit/lib/temporal/activity/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
require 'temporal/scheduled_thread_pool'

describe Temporal::Activity::TaskProcessor do
subject { described_class.new(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) }
subject { described_class.new(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) }

let(:namespace) { 'test-namespace' }
let(:task_queue) { 'test-queue' }
let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) }
let(:task) do
Fabricate(
Expand Down Expand Up @@ -149,9 +150,11 @@
.with(
Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME,
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
workflow: workflow_name
hash_including({
activity: activity_name,
namespace: namespace,
workflow: workflow_name
})
)
end

Expand All @@ -165,6 +168,7 @@
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
task_queue: task_queue,
workflow: workflow_name
)
end
Expand Down Expand Up @@ -240,9 +244,11 @@
.with(
Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME,
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
workflow: workflow_name
hash_including({
activity: activity_name,
namespace: namespace,
workflow: workflow_name
})
)
end

Expand All @@ -256,6 +262,7 @@
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
task_queue: task_queue,
workflow: workflow_name
)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/unit/lib/temporal/workflow/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def poll(task, times: 1)

expect(Temporal::Workflow::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, empty_middleware_chain, empty_middleware_chain, config, binary_checksum)
.with(task, task_queue, namespace, lookup, empty_middleware_chain, empty_middleware_chain, config, binary_checksum)
expect(task_processor).to have_received(:process)
end

Expand Down Expand Up @@ -151,7 +151,7 @@ def call(_); end
expect(Temporal::Middleware::Chain).to have_received(:new).with(workflow_middleware)
expect(Temporal::Workflow::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
.with(task, task_queue, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
end
end
end
Expand Down
39 changes: 26 additions & 13 deletions spec/unit/lib/temporal/workflow/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

describe Temporal::Workflow::TaskProcessor do
subject do
described_class.new(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
described_class.new(task, task_queue, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
end

let(:namespace) { 'test-namespace' }
let(:task_queue) { 'test-queue' }
let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) }
let(:query) { nil }
let(:queries) { nil }
Expand Down Expand Up @@ -73,8 +74,10 @@
.to have_received(:increment)
.with(
Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED,
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down Expand Up @@ -203,8 +206,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end

Expand All @@ -216,8 +221,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_LATENCY,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down Expand Up @@ -251,8 +258,10 @@
.to have_received(:increment)
.with(
Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED,
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down Expand Up @@ -312,8 +321,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end

Expand All @@ -325,8 +336,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_LATENCY,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down