Skip to content

Commit

Permalink
lock around unconfirmed
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Mar 25, 2024
1 parent f444e19 commit 7b71150
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions lib/amqp/client/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def initialize(connection, id)
@on_return = nil
@confirm = nil
@unconfirmed = ::Queue.new
@unconfirmed_lock = Mutex.new
@unconfirmed_empty = ::SizedQueue.new(1)
@basic_gets = ::Queue.new
end
Expand Down Expand Up @@ -407,7 +408,9 @@ def confirm_select(no_wait: false)
# Block until all publishes messages are confirmed
# @return [Boolean] True if all message where positivly acknowledged, false if not
def wait_for_confirms
return true if @unconfirmed.empty?
@unconfirmed_lock.synchronize do
return true if @unconfirmed.empty?
end

ok = @unconfirmed_empty.pop
raise Error::Closed.new(@id, *@closed) if ok.nil?
Expand All @@ -419,24 +422,24 @@ def wait_for_confirms
# @api private
def confirm(args)
ack_or_nack, delivery_tag, multiple = *args
loop do
tag = @unconfirmed.pop(true)
break if tag == delivery_tag
next if multiple && tag < delivery_tag

@unconfirmed << tag # requeue
rescue ThreadError
break
@unconfirmed_lock.synchronize do
loop do
tag = @unconfirmed.pop(true)
break if tag == delivery_tag
next if multiple && tag < delivery_tag

@unconfirmed.push(tag) # requeue
rescue ThreadError
break
end
return unless @unconfirmed.empty?
end
return unless @unconfirmed.empty?

ok = ack_or_nack == :ack
@unconfirmed_empty.push(ok) until @unconfirmed_empty.num_waiting.zero?
begin
@unconfirmed_empty.pop(true) # don't leave a residual ok in the queue
rescue ThreadError
nil
until @unconfirmed_empty.num_waiting.zero?
@unconfirmed_empty.push(ok, timeout: 0.001) || break
end
@unconfirmed_empty.pop(timeout: 0) # don't leave a residual ok in the queue
end

# @!endgroup
Expand Down

0 comments on commit 7b71150

Please sign in to comment.