Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ Temporal::Worker.new(
workflow_thread_pool_size: 10, # how many threads poll for workflows
binary_checksum: nil, # identifies the version of workflow worker code
activity_poll_retry_seconds: 0, # how many seconds to wait after unsuccessful poll for activities
workflow_poll_retry_seconds: 0 # how many seconds to wait after unsuccessful poll for workflows
workflow_poll_retry_seconds: 0, # how many seconds to wait after unsuccessful poll for workflows
activity_max_tasks_per_second: 0 # rate-limit for starting activity tasks (new activities + retries) on the task queue
)
```

Expand Down
10 changes: 8 additions & 2 deletions lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class Activity
class Poller
DEFAULT_OPTIONS = {
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0 # unlimited
}.freeze

def initialize(namespace, task_queue, activity_lookup, config, middleware = [], options = {})
Expand Down Expand Up @@ -91,7 +92,8 @@ def poll_loop
end

def poll_for_task
connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue,
max_tasks_per_second: max_tasks_per_second)
rescue ::GRPC::Cancelled
# We're shutting down and we've already reported that in the logs
nil
Expand All @@ -115,6 +117,10 @@ def poll_retry_seconds
@options[:poll_retry_seconds]
end

def max_tasks_per_second
@options[:max_tasks_per_second]
end

def thread_pool
@thread_pool ||= ThreadPool.new(
options[:thread_pool_size],
Expand Down
8 changes: 7 additions & 1 deletion lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def respond_workflow_task_failed(namespace:, task_token:, cause:, exception:, bi
client.respond_workflow_task_failed(request)
end

def poll_activity_task_queue(namespace:, task_queue:)
def poll_activity_task_queue(namespace:, task_queue:, max_tasks_per_second: 0)
request = Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new(
identity: identity,
namespace: namespace,
Expand All @@ -265,6 +265,12 @@ def poll_activity_task_queue(namespace:, task_queue:)
)
)

if max_tasks_per_second > 0
request.task_queue_metadata = Temporalio::Api::TaskQueue::V1::TaskQueueMetadata.new(
max_tasks_per_second: Google::Protobuf::DoubleValue.new(value: max_tasks_per_second)
)
end

poll_mutex.synchronize do
return unless can_poll?

Expand Down
19 changes: 16 additions & 3 deletions lib/temporal/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Temporal
class Worker
# activity_thread_pool_size: number of threads that the poller can use to run activities.
# can be set to 1 if you want no paralellism in your activities, at the cost of throughput.

#
# binary_checksum: The binary checksum identifies the version of workflow worker code. It is set on each completed or failed workflow
# task. It is present in API responses that return workflow execution info, and is shown in temporal-web and tctl.
# It is traditionally a checksum of the application binary. However, Temporal server treats this as an opaque
Expand All @@ -21,13 +21,25 @@ class Worker
# from workers with these bad versions.
#
# See https://docs.temporal.io/docs/tctl/how-to-use-tctl/#recovery-from-bad-deployment----auto-reset-workflow
#
# activity_max_tasks_per_second: Optional: Sets the rate limiting on number of activities that can be executed per second
#
# This limits new activities being started and activity attempts being scheduled. It does NOT
# limit the number of concurrent activities being executed on this task queue.
#
# This is managed by the server and controls activities per second for the entire task queue
# across all the workers. Notice that the number is represented in double, so that you can set
# it to less than 1 if needed. For example, set the number to 0.1 means you want your activity
# to be executed once every 10 seconds. This can be used to protect down stream services from
# flooding. The zero value of this uses the default value. Default is unlimited.
def initialize(
config = Temporal.configuration,
activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size],
workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size],
binary_checksum: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:binary_checksum],
activity_poll_retry_seconds: Temporal::Activity::Poller::DEFAULT_OPTIONS[:poll_retry_seconds],
workflow_poll_retry_seconds: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:poll_retry_seconds]
workflow_poll_retry_seconds: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:poll_retry_seconds],
activity_max_tasks_per_second: Temporal::Activity::Poller::DEFAULT_OPTIONS[:max_tasks_per_second]
)
@config = config
@workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
Expand All @@ -39,7 +51,8 @@ def initialize(
@shutting_down = false
@activity_poller_options = {
thread_pool_size: activity_thread_pool_size,
poll_retry_seconds: activity_poll_retry_seconds
poll_retry_seconds: activity_poll_retry_seconds,
max_tasks_per_second: activity_max_tasks_per_second
}
@workflow_poller_options = {
thread_pool_size: workflow_thread_pool_size,
Expand Down
30 changes: 30 additions & 0 deletions spec/unit/lib/temporal/activity/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,36 @@ def call(_); end
end
end

context 'when max_tasks_per_second is set' do
subject do
described_class.new(
namespace,
task_queue,
lookup,
config,
middleware,
{
max_tasks_per_second: 32
}
)
end

it 'sends PollActivityTaskQueue requests with the configured task rate-limit' do
times = poll(nil, times: 2)
expect(times).to be >= 2

expect(connection).to have_received(:poll_activity_task_queue)
.with(
namespace: namespace,
task_queue: task_queue,
max_tasks_per_second: 32
)
.at_least(2)
.times
end
end


context 'when connection is unable to poll and poll_retry_seconds is set' do
subject do
described_class.new(
Expand Down
43 changes: 43 additions & 0 deletions spec/unit/lib/temporal/grpc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,49 @@ class TestDeserializer
end
end

describe '#poll_activity_task_queue' do
let(:task_queue) { 'test-task-queue' }
let(:temporal_response) do
Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueResponse.new
end
let(:poll_request) do
instance_double(
"GRPC::ActiveCall::Operation",
execute: temporal_response
)
end

before do
allow(grpc_stub).to receive(:poll_activity_task_queue).with(anything, return_op: true).and_return(poll_request)
end

it 'makes an API request' do
subject.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)

expect(grpc_stub).to have_received(:poll_activity_task_queue) do |request|
expect(request).to be_an_instance_of(Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest)
expect(request.namespace).to eq(namespace)
expect(request.task_queue.name).to eq(task_queue)
expect(request.identity).to eq(identity)
expect(request.task_queue_metadata).to be_nil
end
end

it 'makes an API request with max_tasks_per_second in the metadata' do
subject.poll_activity_task_queue(namespace: namespace, task_queue: task_queue, max_tasks_per_second: 10)

expect(grpc_stub).to have_received(:poll_activity_task_queue) do |request|
expect(request).to be_an_instance_of(Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest)
expect(request.namespace).to eq(namespace)
expect(request.task_queue.name).to eq(task_queue)
expect(request.identity).to eq(identity)
expect(request.task_queue_metadata).to_not be_nil
expect(request.task_queue_metadata.max_tasks_per_second).to_not be_nil
expect(request.task_queue_metadata.max_tasks_per_second.value).to eq(10)
end
end
end

describe '#add_custom_search_attributes' do
it 'calls GRPC service with supplied arguments' do
allow(grpc_operator_stub).to receive(:add_search_attributes)
Expand Down
35 changes: 30 additions & 5 deletions spec/unit/lib/temporal/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ def start_and_stop(worker)
config,
[],
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0
)
.and_return(activity_poller_1)

Expand All @@ -306,7 +307,8 @@ def start_and_stop(worker)
config,
[],
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0
)
.and_return(activity_poller_2)

Expand All @@ -333,7 +335,7 @@ def start_and_stop(worker)
an_instance_of(Temporal::ExecutableLookup),
an_instance_of(Temporal::Configuration),
[],
{thread_pool_size: 10, poll_retry_seconds: 0}
{thread_pool_size: 10, poll_retry_seconds: 0, max_tasks_per_second: 0}
)
.and_return(activity_poller)

Expand Down Expand Up @@ -406,7 +408,7 @@ def start_and_stop(worker)
an_instance_of(Temporal::ExecutableLookup),
an_instance_of(Temporal::Configuration),
[],
{thread_pool_size: 20, poll_retry_seconds: 10}
{thread_pool_size: 20, poll_retry_seconds: 10, max_tasks_per_second: 0}
)
.and_return(activity_poller)

Expand Down Expand Up @@ -441,6 +443,28 @@ def start_and_stop(worker)
expect(workflow_poller).to have_received(:start)
end

it 'can have an activity poller that registers a task rate limit' do
activity_poller = instance_double(Temporal::Activity::Poller, start: nil, stop_polling: nil, cancel_pending_requests: nil, wait: nil)
expect(Temporal::Activity::Poller)
.to receive(:new)
.with(
'default-namespace',
'default-task-queue',
an_instance_of(Temporal::ExecutableLookup),
an_instance_of(Temporal::Configuration),
[],
{thread_pool_size: 20, poll_retry_seconds: 0, max_tasks_per_second: 5}
)
.and_return(activity_poller)

worker = Temporal::Worker.new(activity_max_tasks_per_second: 5)
worker.register_activity(TestWorkerActivity)

start_and_stop(worker)

expect(activity_poller).to have_received(:start)
end

context 'when middleware is configured' do
let(:entry_1) { instance_double(Temporal::Middleware::Entry) }
let(:entry_2) { instance_double(Temporal::Middleware::Entry) }
Expand Down Expand Up @@ -492,7 +516,8 @@ def start_and_stop(worker)
config,
[entry_2],
thread_pool_size: 20,
poll_retry_seconds: 0
poll_retry_seconds: 0,
max_tasks_per_second: 0
)
.and_return(activity_poller_1)

Expand Down