Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker config interface #34

Merged
merged 4 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
--require spec_helper
--format doc
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.12.0 (2023-10-26)

- add new interface to setup consumers including their exchange, queue and binding the queue to the exchange via routing key via `Ears.setup_consumers` and `configure(queue:, exchange:,routing_key:, ...)` for Ears::Consumers subclasses

## 0.11.2 (2023-10-25)

- Add documentation generation via yard
Expand Down
3 changes: 2 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
ears (0.11.2)
ears (0.12.0)
bunny (~> 2.22.0)
multi_json

Expand Down Expand Up @@ -107,6 +107,7 @@ PLATFORMS
arm64-darwin-20
arm64-darwin-21
arm64-darwin-22
ruby
x86_64-darwin-20
x86_64-darwin-21
x86_64-darwin-22
Expand Down
5 changes: 5 additions & 0 deletions lib/ears.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def setup(&block)
Ears::Setup.new.instance_eval(&block)
end

# Quick setup your consumers (including exchanges and queues).
def setup_consumers(*consumer_classes)
Ears::Setup.new.setup_consumers(*consumer_classes)
end

# Blocks the calling thread until +SIGTERM+ or +SIGINT+ is received.
# Used to keep the process alive while processing messages.
def run!
Expand Down
49 changes: 49 additions & 0 deletions lib/ears/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ def self.use(middleware, opts = {})
middlewares << middleware.new(opts)
end

# Configures the consumer, setting queue, exchange and other options to be used by
# the add_consumer method.
#
# @param [Hash] opts The options to configure the consumer with.
# @option opts [String] :queue The name of the queue to consume from.
# @option opts [String] :exchange The name of the exchange the queue should be bound to.
# @option opts [String] :routing_key The routing key used the queue binding.
# @option opts [Boolean] :durable_queue (true) Whether the queue should be durable.
# @option opts [Boolean] :retry_queue (false) Whether a retry queue should be provided.
# @option opts [Integer] :retry_delay (5000) The delay in milliseconds before retrying a message.
# @option opts [Boolean] :error_queue (false) Whether an error queue should be provided.
# @option opts [Boolean] :durable_exchange (true) Whether the exchange should be durable.
# @option opts [Symbol] :exchange_type (:topic) The type of exchange to use.
def self.configure(opts = {})
self.queue = opts.fetch(:queue)
self.exchange = opts.fetch(:exchange)
self.routing_key = opts.fetch(:routing_key)
self.queue_options = queue_options_from(opts: opts)
self.durable_exchange = opts.fetch(:durable_exchange, true)
self.exchange_type = opts.fetch(:exchange_type, :topic)
end

# The method that is called when a message from the queue is received.
# Keep in mind that the parameters received can be altered by middlewares!
#
Expand Down Expand Up @@ -101,5 +123,32 @@ def verify_result(result)
raise InvalidReturnError, result
end
end

class << self
attr_reader :queue,
:exchange,
:routing_key,
:queue_options,
:durable_exchange,
:exchange_type

private

def queue_options_from(opts:)
{
durable: opts.fetch(:durable_queue, true),
retry_queue: opts.fetch(:retry_queue, false),
retry_delay: opts.fetch(:retry_delay, 5000),
error_queue: opts.fetch(:error_queue, false),
}
end

attr_writer :queue,
:exchange,
:routing_key,
:queue_options,
:durable_exchange,
:exchange_type
end
end
end
1 change: 1 addition & 0 deletions lib/ears/middlewares/max_retries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def initialize(opts)

def call(delivery_info, metadata, payload, app)
return handle_exceeded(payload) if retries_exceeded?(metadata)

app.call(delivery_info, metadata, payload)
end

Expand Down
20 changes: 20 additions & 0 deletions lib/ears/setup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,28 @@ def consumer(queue, consumer_class, threads = 1, args = {})
end
end

# Sets up consumers, including bindings to exchanges and queues.
#
# @param [Array<Class<Ears::Consumer>>] consumer_classes An array of subclasses of {Ears::Consumer} that call {Ears::Consumer#configure} in their class definition.
def setup_consumers(*consumer_classes)
consumer_classes.each { |consumer_class| setup_consumer(consumer_class) }
end

private

def setup_consumer(consumer_class)
exchange =
exchange(
consumer_class.exchange,
consumer_class.exchange_type,
durable: consumer_class.durable_exchange,
)
configured_queue =
queue(consumer_class.queue, consumer_class.queue_options)
configured_queue.bind(exchange, routing_key: consumer_class.routing_key)
consumer(configured_queue, consumer_class)
end

def queue_options(bunny_opts, retry_arguments)
return bunny_opts unless retry_arguments

Expand Down
2 changes: 1 addition & 1 deletion lib/ears/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Ears
VERSION = '0.11.2'
VERSION = '0.12.0'
end
118 changes: 118 additions & 0 deletions spec/ears/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,124 @@
allow(channel).to receive(:ack).with(delivery_tag, false)
end

describe '.configure' do
let(:mandatory_options) do
{ queue: 'test', exchange: 'exchange', routing_key: 'test.new' }
end
let(:custom_consumer_class) { Class.new(Ears::Consumer) }

it 'sets the queue' do
custom_consumer_class.configure(mandatory_options)

expect(custom_consumer_class.queue).to eq('test')
end

it 'sets the exchange' do
custom_consumer_class.configure(mandatory_options)

expect(custom_consumer_class.exchange).to eq('exchange')
end

it 'sets the routing key' do
custom_consumer_class.configure(mandatory_options)

expect(custom_consumer_class.routing_key).to eq('test.new')
end

context 'without providing specific options' do
it 'provides defaults for queue options' do
custom_consumer_class.configure(mandatory_options)

expect(custom_consumer_class.queue_options).to eq(
{
durable: true,
retry_queue: false,
retry_delay: 5000,
error_queue: false,
},
)
end

it 'provides a default for durable_exchange' do
custom_consumer_class.configure(mandatory_options)

expect(custom_consumer_class.durable_exchange).to be(true)
end

it 'provides a default for exchange_type' do
custom_consumer_class.configure(mandatory_options)

expect(custom_consumer_class.exchange_type).to eq(:topic)
end
end

context 'with durable_queue specified' do
it 'sets the queue options' do
custom_consumer_class.configure(
mandatory_options.merge({ durable_queue: false }),
)

expect(custom_consumer_class.queue_options).to include(durable: false)
end
end

context 'with retry_queue specified' do
it 'sets the queue options' do
custom_consumer_class.configure(
mandatory_options.merge({ retry_queue: true }),
)

expect(custom_consumer_class.queue_options).to include(
retry_queue: true,
)
end
end

context 'with retry_delay specified' do
it 'sets the queue options' do
custom_consumer_class.configure(
mandatory_options.merge({ retry_delay: 1000 }),
)

expect(custom_consumer_class.queue_options).to include(
retry_delay: 1000,
)
end
end

context 'with error_queue specified' do
it 'sets the queue options' do
custom_consumer_class.configure(
mandatory_options.merge({ error_queue: true }),
)

expect(custom_consumer_class.queue_options).to include(
error_queue: true,
)
end
end

context 'with durable_exchange specified' do
it 'sets the durable_exchange' do
custom_consumer_class.configure(
mandatory_options.merge({ durable_exchange: false }),
)

expect(custom_consumer_class.durable_exchange).to be(false)
end
end

context 'with exchange_type specified' do
it 'sets the exchange_type' do
custom_consumer_class.configure(
mandatory_options.merge({ exchange_type: :direct }),
)

expect(custom_consumer_class.exchange_type).to eq(:direct)
end
end
end

describe '#work' do
it 'raises a not implemented error' do
expect { instance.work(delivery_info, metadata, payload) }.to raise_error(
Expand Down
46 changes: 46 additions & 0 deletions spec/ears/setup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,52 @@

before { allow(Ears).to receive(:channel).and_return(ears_channel) }

# rubocop:disable RSpec/SubjectStub
describe '#setup_consumers' do
let(:consumers) do
[
class_double(
Ears::Consumer,
exchange: 'my-exchange',
queue: 'my-queue',
exchange_type: :topic,
durable_exchange: true,
queue_options: {
my_option: true,
},
routing_key: 'my_key',
),
]
end
let(:an_exchange) { instance_double(Bunny::Exchange) }
let(:a_queue) { instance_double(Bunny::Queue, bind: nil) }

before do
allow(setup).to receive_messages(exchange: an_exchange, queue: a_queue)
allow(setup).to receive(:consumer)
end

it 'sets up the consumers accordingly including its queues etc.' do
setup.setup_consumers(*consumers)

expect(setup).to have_received(:exchange).with(
'my-exchange',
:topic,
durable: true,
)
expect(setup).to have_received(:queue).with(
'my-queue',
{ my_option: true },
)
expect(a_queue).to have_received(:bind).with(
an_exchange,
routing_key: 'my_key',
)
expect(setup).to have_received(:consumer).with(a_queue, consumers[0])
end
end
# rubocop:enable RSpec/SubjectStub

describe '#exchange' do
it 'creates a new Bunny exchange with the given options' do
expect(Bunny::Exchange).to receive(:new).with(
Expand Down
11 changes: 11 additions & 0 deletions spec/ears_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@
end
end

describe '.setup_consumers' do
let(:klasses) { %i[first_class second_class] }

it 'calls the setup_consumers' do
setup = instance_double(Ears::Setup, setup_consumers: nil)
allow(Ears::Setup).to receive(:new).and_return(setup)
described_class.setup_consumers(*klasses)
expect(setup).to have_received(:setup_consumers).with(*klasses)
end
end

describe '.stop!' do
let(:bunny_session) do
instance_double(
Expand Down