Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue.subscribe! #20

Merged
merged 7 commits into from
Jun 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions lib/bunny_mock/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,46 @@ def confirm_select(callback = nil)
# noop
end

##
# Does nothing atm.
#
# @return nil
# @api public
#
def prefetch(*)
# noop
end

##
# Does not actually wait, but always return true.
#
# @return true
# @api public
#
def wait_for_confirms(*)
true
end

##
# Does nothing atm.
#
# @return nil
# @api public
#
def acknowledge(*)
# noop
end

##
# Does nothing atm.
#
# @return nil
# @api public
#
def reject(*)
# noop
end

# @endgroup

#
Expand All @@ -260,11 +300,11 @@ def queue_bind(queue, key, xchg)
end

# @private
def queue_unbind(key, xchg)
def queue_unbind(queue, key, xchg)
exchange = @connection.find_exchange xchg
raise Bunny::NotFound.new("Exchange '#{xchg}' was not found", self, false) unless exchange

exchange.remove_route key
exchange.remove_route key, queue
end

# @private
Expand All @@ -276,11 +316,11 @@ def xchg_bound_to?(receiver, key, name)
end

# @private
def xchg_routes_to?(key, xchg)
def xchg_routes_to?(queue, key, xchg)
exchange = @connection.find_exchange xchg
raise Bunny::NotFound.new("Exchange '#{xchg}' was not found", self, false) unless exchange

exchange.routes_to? key
exchange.routes_to? queue, routing_key: key
end

# @private
Expand All @@ -292,11 +332,11 @@ def xchg_bind(receiver, routing_key, name)
end

# @private
def xchg_unbind(routing_key, name)
def xchg_unbind(routing_key, name, exchange)
source = @connection.find_exchange name
raise Bunny::NotFound.new("Exchange '#{name}' was not found", self, false) unless source

source.remove_route routing_key
source.remove_route routing_key, exchange
end

private
Expand Down
21 changes: 12 additions & 9 deletions lib/bunny_mock/exchange.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,11 @@ def bind(exchange, opts = {})
#
def unbind(exchange, opts = {})
if exchange.respond_to?(:remove_route)

# we can do the unbinding ourselves
exchange.remove_route opts.fetch(:routing_key, @name)
exchange.remove_route opts.fetch(:routing_key, @name), self
else

# we need the channel to look up the exchange
@channel.xchg_unbind opts.fetch(:routing_key, @name), exchange
@channel.xchg_unbind opts.fetch(:routing_key, @name), exchange, self
end

self
Expand All @@ -195,7 +193,6 @@ def unbind(exchange, opts = {})
#
def bound_to?(exchange, opts = {})
if exchange.respond_to?(:routes_to?)

# we can find out on the exchange object
exchange.routes_to? self, opts
else
Expand All @@ -218,7 +215,8 @@ def bound_to?(exchange, opts = {})
#
def routes_to?(exchange_or_queue, opts = {})
route = exchange_or_queue.respond_to?(:name) ? exchange_or_queue.name : exchange_or_queue
@routes.key? opts.fetch(:routing_key, route)
rk = opts.fetch(:routing_key, route)
@routes.key?(rk) && @routes[rk].any? { |r| r == exchange_or_queue }
end

def has_binding?(exchange_or_queue, opts = {}) # rubocop:disable Style/PredicateName
Expand All @@ -245,12 +243,17 @@ def deliver(payload, opts, key)

# @private
def add_route(key, xchg_or_queue)
@routes[key] = xchg_or_queue
@routes[key] ||= []
@routes[key] << xchg_or_queue
end

# @private
def remove_route(key)
@routes.delete key
def remove_route(key, xchg_or_queue)
instance = xchg_or_queue.respond_to?(:name) ? xchg_or_queue.name : xchg_or_queue
@routes[key].delete_if do |r|
route = r.respond_to?(:name) ? r.name : r
route == instance
end if @routes.key? key
end
end
end
2 changes: 1 addition & 1 deletion lib/bunny_mock/exchanges/direct.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Direct < BunnyMock::Exchange
# @api public
#
def deliver(payload, opts, key)
@routes[key].publish payload, opts if @routes[key]
@routes[key].each { |route| route.publish payload, opts } if @routes[key]
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/bunny_mock/exchanges/fanout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Fanout < BunnyMock::Exchange
# @api public
#
def deliver(payload, opts, _key)
@routes.each_value { |destination| destination.publish(payload, opts) }
@routes.values.flatten.each { |destination| destination.publish(payload, opts) }
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/bunny_mock/exchanges/headers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Header < BunnyMock::Exchange
#
def deliver(payload, opts, key)
# ~: proper headers exchange implementation
@routes[key].publish payload, opts if @routes[key]
@routes[key].each { |route| route.publish payload, opts } if @routes[key]
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/bunny_mock/exchanges/topic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Topic < BunnyMock::Exchange
#
def deliver(payload, opts, key)
delivery_routes = @routes.dup.keep_if { |route, _| key =~ route_to_regex(route) }
delivery_routes.values.each { |dest| dest.publish(payload, opts) }
delivery_routes.values.flatten.each { |dest| dest.publish(payload, opts) }
end

private
Expand Down
38 changes: 32 additions & 6 deletions lib/bunny_mock/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ def publish(payload, opts = {})

# add to messages
@messages << { message: payload, options: opts }
yield_consumers
self
end

##
# Adds a consumer to the queue (subscribes for message deliveries).
#
# All params are ignored atm. Takes a block which is called when a message is delivered
# to the queue
#
# @api public
#
def subscribe(*_args, &block)
@consumers ||= []
@consumers << block
yield_consumers

self
end
Expand Down Expand Up @@ -109,11 +125,11 @@ def unbind(exchange, opts = {})
if exchange.respond_to?(:remove_route)

# we can do the unbinding ourselves
exchange.remove_route opts.fetch(:routing_key, @name)
exchange.remove_route opts.fetch(:routing_key, @name), self
else

# we need the channel to lookup the exchange
@channel.queue_unbind opts.fetch(:routing_key, @name), exchange
@channel.queue_unbind self, opts.fetch(:routing_key, @name), exchange
end
end

Expand All @@ -134,13 +150,11 @@ def bound_to?(exchange, opts = {})
check_queue_deleted!

if exchange.respond_to?(:routes_to?)

# we can do the check ourselves
exchange.routes_to? opts.fetch(:routing_key, @name)
exchange.routes_to? self, opts
else

# we need the channel to lookup the exchange
@channel.xchg_routes_to? opts.fetch(:routing_key, @name), exchange
@channel.xchg_routes_to? self, opts.fetch(:routing_key, @name), exchange
end
end

Expand Down Expand Up @@ -223,5 +237,17 @@ def pop_response(message)

[di, mp, message[:message]]
end

# @private
def yield_consumers
return if @consumers.nil?
@consumers.each do |c|
# rubocop:disable AssignmentInCondition
while message = all.pop
response = pop_response(message)
c.call(response)
end
end
end
end
end
44 changes: 44 additions & 0 deletions spec/integration/queue_subscribe_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
describe BunnyMock::Queue, '#subscribe' do
before do
@ch1 = @session.channel
@ch2 = @session.channel
end

context 'when delevered to an exchange' do
it 'should be delevered in all queues bound to the routing key' do
t = @ch1.topic 'amq.topic'
q1 = @ch1.queue 'q1'
q2 = @ch1.queue 'q2'
q3 = @ch2.queue 'q3'
q4 = @ch2.queue 'q4'
q1.bind(t, routing_key: 'rk1')
q2.bind(t, routing_key: 'rk1')
q3.bind(t, routing_key: 'rk1')
q4.bind(t, routing_key: 'rk1')

delivered = 0
q1.subscribe do |_, _, body|
expect(body).to eq 'test'
delivered += 1
end

q2.subscribe do |_, _, body|
expect(body).to eq 'test'
delivered += 1
end

q3.subscribe do |_, _, body|
expect(body).to eq 'test'
delivered += 1
end

q4.subscribe do |_, _, body|
expect(body).to eq 'test'
delivered += 1
end

t.publish('test', { routing_key: 'rk1' })
expect(delivered).to eq 4
end
end
end
10 changes: 10 additions & 0 deletions spec/unit/bunny_mock/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@
end
end

context '#subscribe' do

it 'should consume messages delivered' do
@queue.subscribe do |_delivery, _headers, body|
expect(body).to eq('test')
end
@queue.publish 'test'
end
end

context '#purge' do

it 'should clear all messages' do
Expand Down