Skip to content

Commit

Permalink
Add Eears#setup_consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
brerx authored and johannesluedke committed Oct 26, 2023
1 parent 9333ff8 commit 6552591
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 33 deletions.
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
--require spec_helper
--format doc
1 change: 1 addition & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Metrics/ClassLength:

Metrics/MethodLength:
Enabled: true
Max: 12
Exclude:
- 'spec/**/*.rb'
- 'test/**/*.rb'
Expand Down
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ PLATFORMS
arm64-darwin-20
arm64-darwin-21
arm64-darwin-22
ruby
x86_64-darwin-20
x86_64-darwin-21
x86_64-darwin-22
Expand Down
5 changes: 5 additions & 0 deletions lib/ears.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def setup(&block)
Ears::Setup.new.instance_eval(&block)
end

# Quick setup your consumers (including exchanges and queues).
def setup_consumers(*consumer_classes)
Ears::Setup.new.setup_consumers(*consumer_classes)
end

# Blocks the calling thread until +SIGTERM+ or +SIGINT+ is received.
# Used to keep the process alive while processing messages.
def run!
Expand Down
18 changes: 18 additions & 0 deletions lib/ears/setup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ def consumer(queue, consumer_class, threads = 1, args = {})
end
end

# Sets up consumers, including bindings to exchanges and queues.
#
# @param [Array<Class<Ears::Consumer>>] consumer_classes An array of subclasses of {Ears::Consumer} that call {Ears::Consumer#configure} in their class definition.
def setup_consumers(*consumer_classes)
consumer_classes.each do |consumer_class|
exchange =
exchange(
consumer_class.exchange,
consumer_class.exchange_type,
durable: consumer_class.durable_exchange,
)
configured_queue =
queue(consumer_class.queue, consumer_class.queue_options)
configured_queue.bind(exchange, routing_key: consumer_class.routing_key)
consumer(configured_queue, consumer_class)
end
end

private

def queue_options(bunny_opts, retry_arguments)
Expand Down
4 changes: 2 additions & 2 deletions spec/ears/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
it 'provides a default for durable_exchange' do
custom_consumer_class.configure(mandatory_options)

expect(custom_consumer_class.durable_exchange).to eq(true)
expect(custom_consumer_class.durable_exchange).to be(true)
end

it 'provides a default for exchange_type' do
Expand Down Expand Up @@ -119,7 +119,7 @@
mandatory_options.merge({ durable_exchange: false }),
)

expect(custom_consumer_class.durable_exchange).to eq(false)
expect(custom_consumer_class.durable_exchange).to be(false)
end
end

Expand Down
62 changes: 31 additions & 31 deletions spec/ears/setup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@
require 'ears/setup'

RSpec.describe Ears::Setup do
let(:connection) { instance_double(Bunny::Session) }
let(:channel) { instance_double(Bunny::Channel, number: 1) }
subject(:setup) { Ears::Setup.new }

let(:connection) { instance_double(Bunny::Session, create_channel: channel) }
let(:channel) do
instance_double(
Bunny::Channel,
number: 1,
prefetch: nil,
on_uncaught_exception: nil,
)
end
let(:exchange) { instance_double(Bunny::Exchange) }
let(:queue) { instance_double(Bunny::Queue, name: 'queue', options: {}) }
let(:queue) do
instance_double(Bunny::Queue, name: 'queue', channel: channel, options: {})
end

before do
allow(connection).to receive(:create_channel).and_return(channel)
allow(Ears).to receive_messages(connection: connection, channel: channel)
allow(channel).to receive(:prefetch)
allow(channel).to receive(:on_uncaught_exception)
allow(Bunny::Queue).to receive(:new).and_return(queue)
allow(queue).to receive(:channel).and_return(channel)
end

describe '#exchange' do
Expand All @@ -25,7 +32,7 @@
{},
).and_return(exchange)

expect(Ears::Setup.new.exchange('name', :topic)).to eq(exchange)
expect(setup.exchange('name', :topic)).to eq(exchange)
end

it 'passes the given options to the exchange' do
Expand All @@ -36,9 +43,7 @@
{ test: 1 },
).and_return(exchange)

expect(Ears::Setup.new.exchange('name', :topic, { test: 1 })).to eq(
exchange,
)
expect(setup.exchange('name', :topic, { test: 1 })).to eq(exchange)
end
end

Expand All @@ -50,7 +55,7 @@
{},
).and_return(queue)

expect(Ears::Setup.new.queue('name')).to eq(queue)
expect(setup.queue('name')).to eq(queue)
end

it 'passes the given options to the queue' do
Expand All @@ -60,7 +65,7 @@
{ test: 1 },
).and_return(queue)

expect(Ears::Setup.new.queue('name', { test: 1 })).to eq(queue)
expect(setup.queue('name', { test: 1 })).to eq(queue)
end

context 'with retry queue' do
Expand All @@ -78,7 +83,7 @@
).and_return(queue)

expect(
Ears::Setup.new.queue(
setup.queue(
'name',
{ retry_queue: true, retry_delay: 1000, test: 1 },
),
Expand All @@ -98,7 +103,7 @@
},
)

expect(Ears::Setup.new.queue('name', retry_queue: true)).to eq(queue)
expect(setup.queue('name', retry_queue: true)).to eq(queue)
end

it 'adds the retry queue as a deadletter to the original queue' do
Expand All @@ -113,7 +118,7 @@
},
)

expect(Ears::Setup.new.queue('name', retry_queue: true)).to eq(queue)
expect(setup.queue('name', retry_queue: true)).to eq(queue)
end

it 'uses the given retry delay for the retry queue' do
Expand All @@ -129,9 +134,9 @@
},
)

expect(
Ears::Setup.new.queue('name', retry_queue: true, retry_delay: 1000),
).to eq(queue)
expect(setup.queue('name', retry_queue: true, retry_delay: 1000)).to eq(
queue,
)
end

it 'merges retry options into the arguments' do
Expand All @@ -149,7 +154,7 @@
).and_return(queue)

expect(
Ears::Setup.new.queue(
setup.queue(
'name',
{
retry_queue: true,
Expand All @@ -168,7 +173,7 @@
it 'creates an error queue with derived name if option is set' do
expect(Bunny::Queue).to receive(:new).with(channel, 'name.error', {})

expect(Ears::Setup.new.queue('name', error_queue: true)).to eq(queue)
expect(setup.queue('name', error_queue: true)).to eq(queue)
end
end
end
Expand Down Expand Up @@ -209,7 +214,7 @@
)
expect(queue).to receive(:subscribe_with).with(consumer_wrapper)

Ears::Setup.new.consumer(queue, MyConsumer)
setup.consumer(queue, MyConsumer)
end

it 'passes the consumer arguments' do
Expand All @@ -232,7 +237,7 @@
)
expect(queue).to receive(:subscribe_with).with(consumer_wrapper)

Ears::Setup.new.consumer(queue, MyConsumer, 1, { a: 1 })
setup.consumer(queue, MyConsumer, 1, { a: 1 })
end

it 'creates a dedicated channel and queue for each consumer' do
Expand All @@ -253,18 +258,13 @@
.exactly(3)
.times

Ears::Setup.new.consumer(queue, MyConsumer, 3)
setup.consumer(queue, MyConsumer, 3)
end

it 'passes the prefetch argument to the channel' do
expect(channel).to receive(:prefetch).with(5)

Ears::Setup.new.consumer(
queue,
MyConsumer,
1,
{ prefetch: 5, bla: 'test' },
)
setup.consumer(queue, MyConsumer, 1, { prefetch: 5, bla: 'test' })
end

it 'numbers the consumers' do
Expand All @@ -283,7 +283,7 @@
{},
).and_return(consumer_wrapper)

Ears::Setup.new.consumer(queue, MyConsumer, 2)
setup.consumer(queue, MyConsumer, 2)
end
end
end
11 changes: 11 additions & 0 deletions spec/ears_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@
end
end

describe '.setup_consumers' do
let(:klasses) { %i[first_class second_class] }

it 'calls the setup_consumers' do
setup = instance_double(Ears::Setup, setup_consumers: nil)
allow(Ears::Setup).to receive(:new).and_return(setup)
described_class.setup_consumers(*klasses)
expect(setup).to have_received(:setup_consumers).with(*klasses)
end
end

describe '.stop!' do
let(:bunny_session) do
instance_double(
Expand Down

0 comments on commit 6552591

Please sign in to comment.