Skip to content

Commit

Permalink
Allow worker classes to subscribe to multiple queues.
Browse files Browse the repository at this point in the history
This allows a worker to be used for more than one queue. Ocassionaly,
logic for several queues is the same, creating a class for each queue is
cumbersome and can be avoided.
  • Loading branch information
Senjai committed Nov 6, 2015
1 parent 267b825 commit 2bf77cf
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 23 deletions.
21 changes: 16 additions & 5 deletions lib/shoryuken/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,28 @@ def server_middleware
@server_chain
end

# TODO: Shoryuken options should not invoke side effects.
# The method makes it sound like it just sets configuration, but it performs
# real logic.
def shoryuken_options(opts = {})
@shoryuken_options = get_shoryuken_options.merge(stringify_keys(opts || {}))
queue = @shoryuken_options['queue']
if queue.respond_to? :call
queue = queue.call
@shoryuken_options['queue'] = queue

queues = (@shoryuken_options["queues"] ||= [])

if @shoryuken_options['queue']
Shoryuken.logger.warn "[DEPRECATION] queue is deprecated as an option in favor of multiple queue support, please use queues instead"

queues << @shoryuken_options["queue"]
@shoryuken_options["queue"] = nil
end

Shoryuken.register_worker(queue, self)
# FIXME: We shouldn't mutate user supplied values.
# Currently done to preserve behavior when a queue is a proc, which probably
# shouldn't be supported.
@shoryuken_options["queues"] = Shoryuken::QueueRegistration.new(self).register_queues!(queues)
end


def auto_visibility_timeout?
!!get_shoryuken_options['auto_visibility_timeout']
end
Expand Down
65 changes: 47 additions & 18 deletions spec/shoryuken/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
TestWorker.perform_async('delayed message', delay_seconds: 60)
end

# TODO: Deprecated
it 'accepts an `queue` option' do
new_queue = 'some_different_queue'

Expand All @@ -99,36 +100,64 @@
end

describe '.shoryuken_options' do
it 'registers a worker' do
expect(Shoryuken.worker_registry.workers('default')).to eq([TestWorker])
let(:dummy_worker) { Class.new }

before do
dummy_worker.include Shoryuken::Worker
end

it 'accepts a block as queue name' do
$queue_prefix = 'production'
subject { dummy_worker.shoryuken_options shoryuken_options }

context "when using the queue key" do
let(:shoryuken_options) {{ "queue" => "a_queue" }}

class NewTestWorker
include Shoryuken::Worker
it "warns about deprecation" do
expect(Shoryuken.logger).to receive(:warn).
with("[DEPRECATION] queue is deprecated as an option in favor of multiple queue support, please use queues instead").
once
subject
end

it "does not keep the value from queue" do
subject
expect(dummy_worker.get_shoryuken_options["queue"]).to be_nil
end

shoryuken_options queue: ->{ "#{$queue_prefix}_default" }
it "merges the queue argument into the queues key" do
subject
expect(dummy_worker.get_shoryuken_options["queues"]).to include("a_queue")
end
end

context "when passing queues with blocks" do
let(:shoryuken_options) {{'queues' => ["a_queue", ->{ "a_block_queue"}] }}

expect(Shoryuken.worker_registry.workers('production_default')).to eq([NewTestWorker])
expect(NewTestWorker.get_shoryuken_options['queue']).to eq 'production_default'
it "resolves the blocks and stores the queues at runtime" do
subject
expect(dummy_worker.get_shoryuken_options["queues"]).to eql(["a_queue", "a_block_queue", "default"])
end
end

it 'is possible to configure the global defaults' do
queue = SecureRandom.uuid
Shoryuken.default_worker_options['queue'] = queue
context "with changes to the default worker options" do
let(:defaults) {{ "queues" => ["randomqueues"], "auto_delete" => false }}
let(:modified_options) { Shoryuken.default_worker_options.merge(defaults) }
let(:shoryuken_options) {{ "auto_delete" => true }}

class GlobalDefaultsTestWorker
include Shoryuken::Worker
before do
allow(Shoryuken).to receive(:default_worker_options).
and_return(modified_options)
end

shoryuken_options auto_delete: true
it "overrides default configuration" do
expect{subject}.to change{dummy_worker.get_shoryuken_options["auto_delete"]}.
from(false).
to(true)
end

expect(GlobalDefaultsTestWorker.get_shoryuken_options['queue']).to eq queue
expect(GlobalDefaultsTestWorker.get_shoryuken_options['auto_delete']).to eq true
expect(GlobalDefaultsTestWorker.get_shoryuken_options['batch']).to eq false
it "still contains configuration not explicitly changed" do
subject
expect(dummy_worker.get_shoryuken_options["queues"]).to include("randomqueues")
end
end
end

Expand Down

0 comments on commit 2bf77cf

Please sign in to comment.