-
-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow a worker to be used for multiple queues. #153
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
module Shoryuken | ||
class QueueRegistration | ||
def initialize worker | ||
@worker = worker | ||
end | ||
|
||
def register_queues! queues | ||
normalize_queues(queues).each do |queue| | ||
Shoryuken.register_worker queue, @worker | ||
end | ||
end | ||
|
||
private | ||
|
||
def normalize_queues queues | ||
Array(queues).map do |queue| | ||
queue.respond_to?(:call) ? queue.call : queue | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,17 +39,28 @@ def server_middleware | |
@server_chain | ||
end | ||
|
||
# TODO: Shoryuken options should not invoke side effects. | ||
# The method makes it sound like it just sets configuration, but it performs | ||
# real logic. | ||
def shoryuken_options(opts = {}) | ||
@shoryuken_options = get_shoryuken_options.merge(stringify_keys(opts || {})) | ||
queue = @shoryuken_options['queue'] | ||
if queue.respond_to? :call | ||
queue = queue.call | ||
@shoryuken_options['queue'] = queue | ||
|
||
queues = (@shoryuken_options['queues'] ||= []) | ||
|
||
if @shoryuken_options['queue'] | ||
Shoryuken.logger.warn '[DEPRECATION] queue is deprecated as an option in favor of multiple queue support, please use queues instead' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [142/120] |
||
|
||
queues << @shoryuken_options['queue'] | ||
@shoryuken_options['queue'] = nil | ||
end | ||
|
||
Shoryuken.register_worker(queue, self) | ||
# FIXME: We shouldn't mutate user supplied values. | ||
# Currently done to preserve behavior when a queue is a proc, which probably | ||
# shouldn't be supported. | ||
@shoryuken_options['queues'] = Shoryuken::QueueRegistration.new(self).register_queues!(queues) | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def auto_visibility_timeout? | ||
!!get_shoryuken_options['auto_visibility_timeout'] | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
require 'spec_helper' | ||
|
||
RSpec.describe Shoryuken::QueueRegistration do | ||
describe "#register_queues!" do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
let(:queues) { ["queue", -> {"block_queue"}] } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Space missing inside {. |
||
let(:unregistered_queue) { "somequeue" } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
let(:dummy_worker) { Class.new } | ||
|
||
before do | ||
dummy_worker.include Shoryuken::Worker | ||
allow(Shoryuken).to receive(:register_worker) | ||
end | ||
|
||
subject { described_class.new(dummy_worker).register_queues! queues } | ||
|
||
it "registers normal string queues" do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
expect(Shoryuken).to receive(:register_worker). | ||
with("queue", dummy_worker) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
subject | ||
end | ||
|
||
it "registers the result of a block queue" do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
expect(Shoryuken).to receive(:register_worker). | ||
with("block_queue", dummy_worker) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
subject | ||
end | ||
|
||
it "returns the queues that were registered" do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
expect(subject).to eql(["queue", "block_queue"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer single-quoted strings when you don't need string interpolation or special symbols. |
||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,7 @@ | |
TestWorker.perform_async('delayed message', delay_seconds: 60) | ||
end | ||
|
||
# TODO: Deprecated | ||
it 'accepts an `queue` option' do | ||
new_queue = 'some_different_queue' | ||
|
||
|
@@ -99,36 +100,64 @@ | |
end | ||
|
||
describe '.shoryuken_options' do | ||
it 'registers a worker' do | ||
expect(Shoryuken.worker_registry.workers('default')).to eq([TestWorker]) | ||
let(:dummy_worker) { Class.new } | ||
|
||
before do | ||
dummy_worker.include Shoryuken::Worker | ||
end | ||
|
||
it 'accepts a block as queue name' do | ||
$queue_prefix = 'production' | ||
subject { dummy_worker.shoryuken_options shoryuken_options } | ||
|
||
context 'when using the queue key' do | ||
let(:shoryuken_options) { { 'queue' => 'a_queue' } } | ||
|
||
class NewTestWorker | ||
include Shoryuken::Worker | ||
it 'warns about deprecation' do | ||
expect(Shoryuken.logger).to receive(:warn). | ||
with('[DEPRECATION] queue is deprecated as an option in favor of multiple queue support, please use queues instead'). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [127/120] |
||
once | ||
subject | ||
end | ||
|
||
it 'does not keep the value from queue' do | ||
subject | ||
expect(dummy_worker.get_shoryuken_options['queue']).to be_nil | ||
end | ||
|
||
shoryuken_options queue: ->{ "#{$queue_prefix}_default" } | ||
it 'merges the queue argument into the queues key' do | ||
subject | ||
expect(dummy_worker.get_shoryuken_options['queues']).to include('a_queue') | ||
end | ||
end | ||
|
||
context 'when passing queues with blocks' do | ||
let(:shoryuken_options) { {'queues' => ['a_queue', ->{ 'a_block_queue'}] } } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Space missing to the left of {. |
||
|
||
expect(Shoryuken.worker_registry.workers('production_default')).to eq([NewTestWorker]) | ||
expect(NewTestWorker.get_shoryuken_options['queue']).to eq 'production_default' | ||
it 'resolves the blocks and stores the queues at runtime' do | ||
subject | ||
expect(dummy_worker.get_shoryuken_options['queues']).to eql(['a_queue', 'a_block_queue', 'default']) | ||
end | ||
end | ||
|
||
it 'is possible to configure the global defaults' do | ||
queue = SecureRandom.uuid | ||
Shoryuken.default_worker_options['queue'] = queue | ||
context 'with changes to the default worker options' do | ||
let(:defaults) { { 'queues' => ['randomqueues'], 'auto_delete' => false } } | ||
let(:modified_options) { Shoryuken.default_worker_options.merge(defaults) } | ||
let(:shoryuken_options) { { 'auto_delete' => true } } | ||
|
||
class GlobalDefaultsTestWorker | ||
include Shoryuken::Worker | ||
before do | ||
allow(Shoryuken).to receive(:default_worker_options). | ||
and_return(modified_options) | ||
end | ||
|
||
shoryuken_options auto_delete: true | ||
it 'overrides default configuration' do | ||
expect{subject}.to change{dummy_worker.get_shoryuken_options['auto_delete']}. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Space missing to the left of {. |
||
from(false). | ||
to(true) | ||
end | ||
|
||
expect(GlobalDefaultsTestWorker.get_shoryuken_options['queue']).to eq queue | ||
expect(GlobalDefaultsTestWorker.get_shoryuken_options['auto_delete']).to eq true | ||
expect(GlobalDefaultsTestWorker.get_shoryuken_options['batch']).to eq false | ||
it 'still contains configuration not explicitly changed' do | ||
subject | ||
expect(dummy_worker.get_shoryuken_options['queues']).to include('randomqueues') | ||
end | ||
end | ||
end | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use def with parentheses when there are parameters.