From 31638cc096041f470e89d8a127bc4b73f1d8ebfe Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 25 Feb 2021 17:57:38 +0000 Subject: [PATCH 1/2] Allow varying options per activity task queue. This also adds support for server-side rate limiting of activities per second for a given task queue. --- lib/temporal/activity/poller.rb | 2 +- lib/temporal/client/grpc_client.rb | 5 ++++- lib/temporal/worker.rb | 15 ++++++++++++--- .../unit/lib/temporal/activity/poller_spec.rb | 2 +- spec/unit/lib/temporal/worker_spec.rb | 19 ++++++++++++++----- 5 files changed, 32 insertions(+), 11 deletions(-) diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index b9d7276e..2db90fd8 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -66,7 +66,7 @@ def poll_loop end def poll_for_task - client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) + client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue, max_tasks_per_second: options[:max_tasks_per_second]) rescue StandardError => error Temporal.logger.error("Unable to poll activity task queue: #{error.inspect}") nil diff --git a/lib/temporal/client/grpc_client.rb b/lib/temporal/client/grpc_client.rb index eab4f1a3..90ffca89 100644 --- a/lib/temporal/client/grpc_client.rb +++ b/lib/temporal/client/grpc_client.rb @@ -160,12 +160,15 @@ def respond_workflow_task_failed(task_token:, cause:, exception: nil) client.respond_workflow_task_failed(request) end - def poll_activity_task_queue(namespace:, task_queue:) + def poll_activity_task_queue(namespace:, task_queue:, max_tasks_per_second: nil) request = Temporal::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new( identity: identity, namespace: namespace, task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue + ), + task_queue_meta: Temporal::Api::TaskQueue::V1::TaskQueueMetadata.new( + max_tasks_per_second: max_tasks_per_second ) ) diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index ca2bf870..57b5ecf6 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -16,9 +16,10 @@ def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OP @workflow_task_middleware = [] @activity_middleware = [] @shutting_down = false - @activity_poller_options = { + @default_activity_poller_options = { thread_pool_size: activity_thread_pool_size, } + @activity_poller_options = Hash.new { {} } end def register_workflow(workflow_class, options = {}) @@ -35,6 +36,10 @@ def register_activity(activity_class, options = {}) @activities[key].add(execution_options.name, activity_class) end + def configure_activity_task_queue(name, options = {}) + @activity_poller_options[name] = @activity_poller_options[name].merge(options) + end + def add_workflow_task_middleware(middleware_class, *args) @workflow_task_middleware << Middleware::Entry.new(middleware_class, args) end @@ -75,7 +80,7 @@ def stop private - attr_reader :activity_poller_options, :activities, :workflows, :pollers, + attr_reader :default_activity_poller_options, :activity_poller_options, :activities, :workflows, :pollers, :workflow_task_middleware, :activity_middleware def shutting_down? @@ -86,8 +91,12 @@ def workflow_poller_for(namespace, task_queue, lookup) Workflow::Poller.new(namespace, task_queue, lookup.freeze, workflow_task_middleware) end + def activity_poller_options_for_task_queue(task_queue) + default_activity_poller_options.merge(activity_poller_options[task_queue]) + end + def activity_poller_for(namespace, task_queue, lookup) - Activity::Poller.new(namespace, task_queue, lookup.freeze, activity_middleware, activity_poller_options) + Activity::Poller.new(namespace, task_queue, lookup.freeze, activity_middleware, activity_poller_options_for_task_queue(task_queue)) end def trap_signals diff --git a/spec/unit/lib/temporal/activity/poller_spec.rb b/spec/unit/lib/temporal/activity/poller_spec.rb index 06ebc90c..5663a8e5 100644 --- a/spec/unit/lib/temporal/activity/poller_spec.rb +++ b/spec/unit/lib/temporal/activity/poller_spec.rb @@ -32,7 +32,7 @@ expect(client) .to have_received(:poll_activity_task_queue) - .with(namespace: namespace, task_queue: task_queue) + .with(namespace: namespace, task_queue: task_queue, max_tasks_per_second: nil) .twice end diff --git a/spec/unit/lib/temporal/worker_spec.rb b/spec/unit/lib/temporal/worker_spec.rb index 4823cac5..661f52a0 100644 --- a/spec/unit/lib/temporal/worker_spec.rb +++ b/spec/unit/lib/temporal/worker_spec.rb @@ -169,18 +169,27 @@ class TestWorkerActivity < Temporal::Activity activity_poller = instance_double(Temporal::Activity::Poller, start: nil) expect(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 10}) - .and_return(activity_poller) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 15}) + .and_return(activity_poller_1) + + + expect(Temporal::Activity::Poller) + .to receive(:new) + .with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 10}) + .and_return(activity_poller_2) - worker = Temporal::Worker.new(activity_thread_pool_size: 10) + worker = Temporal::Worker.new(activity_thread_pool_size: 15) allow(worker).to receive(:shutting_down?).and_return(true) worker.register_workflow(TestWorkerWorkflow) worker.register_activity(TestWorkerActivity) + worker.register_activity(TestWorkerActivity, task_queue: 'other-task-queue') - worker.start + worker.configure_activity_task_queue('other-task-queue', thread_pool_size: 10) - expect(activity_poller).to have_received(:start) + worker.start + expect(activity_poller_1).to have_received(:start) + expect(activity_poller_2).to have_received(:start) end context 'when middleware is configured' do From c47a023bbb94892bb22148311ca4df132ff75c64 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 25 Feb 2021 21:13:06 +0000 Subject: [PATCH 2/2] Refactor to only add max_tasks_per_second, leaving API as-is. --- lib/temporal/worker.rb | 18 ++++----------- spec/unit/lib/temporal/worker_spec.rb | 33 ++++++++++++++++----------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index 57b5ecf6..ae846ce9 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -9,17 +9,17 @@ module Temporal class Worker # activity_thread_pool_size: number of threads that the poller can use to run activities. # can be set to 1 if you want no paralellism in your activities, at the cost of throughput. - def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size]) + def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], activity_max_tasks_per_second: nil) @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @pollers = [] @workflow_task_middleware = [] @activity_middleware = [] @shutting_down = false - @default_activity_poller_options = { + @activity_poller_options = { thread_pool_size: activity_thread_pool_size, + max_tasks_per_second: activity_max_tasks_per_second } - @activity_poller_options = Hash.new { {} } end def register_workflow(workflow_class, options = {}) @@ -36,10 +36,6 @@ def register_activity(activity_class, options = {}) @activities[key].add(execution_options.name, activity_class) end - def configure_activity_task_queue(name, options = {}) - @activity_poller_options[name] = @activity_poller_options[name].merge(options) - end - def add_workflow_task_middleware(middleware_class, *args) @workflow_task_middleware << Middleware::Entry.new(middleware_class, args) end @@ -80,7 +76,7 @@ def stop private - attr_reader :default_activity_poller_options, :activity_poller_options, :activities, :workflows, :pollers, + attr_reader :activity_poller_options, :activities, :workflows, :pollers, :workflow_task_middleware, :activity_middleware def shutting_down? @@ -91,12 +87,8 @@ def workflow_poller_for(namespace, task_queue, lookup) Workflow::Poller.new(namespace, task_queue, lookup.freeze, workflow_task_middleware) end - def activity_poller_options_for_task_queue(task_queue) - default_activity_poller_options.merge(activity_poller_options[task_queue]) - end - def activity_poller_for(namespace, task_queue, lookup) - Activity::Poller.new(namespace, task_queue, lookup.freeze, activity_middleware, activity_poller_options_for_task_queue(task_queue)) + Activity::Poller.new(namespace, task_queue, lookup.freeze, activity_middleware, activity_poller_options) end def trap_signals diff --git a/spec/unit/lib/temporal/worker_spec.rb b/spec/unit/lib/temporal/worker_spec.rb index 661f52a0..6d4fee14 100644 --- a/spec/unit/lib/temporal/worker_spec.rb +++ b/spec/unit/lib/temporal/worker_spec.rb @@ -144,12 +144,12 @@ class TestWorkerActivity < Temporal::Activity allow(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 20}) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 20}) .and_return(activity_poller_1) allow(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 20}) + .with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 20}) .and_return(activity_poller_2) subject.register_workflow(TestWorkerWorkflow) @@ -169,27 +169,34 @@ class TestWorkerActivity < Temporal::Activity activity_poller = instance_double(Temporal::Activity::Poller, start: nil) expect(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 15}) - .and_return(activity_poller_1) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 10}) + .and_return(activity_poller) + worker = Temporal::Worker.new(activity_thread_pool_size: 10) + allow(worker).to receive(:shutting_down?).and_return(true) + worker.register_workflow(TestWorkerWorkflow) + worker.register_activity(TestWorkerActivity) + worker.start + + expect(activity_poller).to have_received(:start) + end + + it 'can have an activity poller which throttles tasks per second' do + activity_poller = instance_double(Temporal::Activity::Poller, start: nil) expect(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 10}) - .and_return(activity_poller_2) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: 15, thread_pool_size: 20}) + .and_return(activity_poller) - worker = Temporal::Worker.new(activity_thread_pool_size: 15) + worker = Temporal::Worker.new(activity_max_tasks_per_second: 15) allow(worker).to receive(:shutting_down?).and_return(true) worker.register_workflow(TestWorkerWorkflow) worker.register_activity(TestWorkerActivity) - worker.register_activity(TestWorkerActivity, task_queue: 'other-task-queue') - - worker.configure_activity_task_queue('other-task-queue', thread_pool_size: 10) worker.start - expect(activity_poller_1).to have_received(:start) - expect(activity_poller_2).to have_received(:start) + expect(activity_poller).to have_received(:start) end context 'when middleware is configured' do @@ -221,7 +228,7 @@ class TestWorkerActivity < Temporal::Activity allow(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [entry_2], thread_pool_size: 20) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [entry_2], max_tasks_per_second: nil, thread_pool_size: 20) .and_return(activity_poller_1) subject.register_workflow(TestWorkerWorkflow)