From 987876245a4c9dae3cd2198f427eeef2581cf139 Mon Sep 17 00:00:00 2001 From: Nicolas Viennot Date: Fri, 11 Apr 2014 03:30:56 -0400 Subject: [PATCH] Fixes a synchronization issue in @unconfirmed_set Fixes #206 --- lib/bunny/channel.rb | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/lib/bunny/channel.rb b/lib/bunny/channel.rb index fbfeb8c62..c74940680 100644 --- a/lib/bunny/channel.rb +++ b/lib/bunny/channel.rb @@ -536,8 +536,10 @@ def basic_publish(payload, exchange, routing_key, opts = {}) opts[:priority] ||= 0 if @next_publish_seq_no > 0 - @unconfirmed_set.add(@next_publish_seq_no) - @next_publish_seq_no += 1 + @unconfirmed_set_mutex.synchronize do + @unconfirmed_set.add(@next_publish_seq_no) + @next_publish_seq_no += 1 + end end frames = AMQ::Protocol::Basic::Publish.encode(@id, @@ -1681,23 +1683,23 @@ def handle_basic_return(basic_return, properties, content) # @private def handle_ack_or_nack(delivery_tag, multiple, nack) - if nack - cloned_set = @unconfirmed_set.clone + @unconfirmed_set_mutex.synchronize do + if nack + cloned_set = @unconfirmed_set.clone + if multiple + cloned_set.keep_if { |i| i <= delivery_tag } + @nacked_set.merge(cloned_set) + else + @nacked_set.add(delivery_tag) + end + end + if multiple - cloned_set.keep_if { |i| i <= delivery_tag } - @nacked_set.merge(cloned_set) + @unconfirmed_set.delete_if { |i| i <= delivery_tag } else - @nacked_set.add(delivery_tag) + @unconfirmed_set.delete(delivery_tag) end - end - if multiple - @unconfirmed_set.delete_if { |i| i <= delivery_tag } - else - @unconfirmed_set.delete(delivery_tag) - end - - @unconfirmed_set_mutex.synchronize do @only_acks_received = (@only_acks_received && !nack) @confirms_continuations.push(true) if @unconfirmed_set.empty?