Skip to content

Commit

Permalink
Channel#wait_for_confirms/confirm_select thread safety
Browse files Browse the repository at this point in the history
Use a ConditionalVariable to make wait_for_confirms truely thread safe.

Breaking change is that wait_for_confirms now returns nil instead of
true/false.

confirm_select also made thread safe
  • Loading branch information
carlhoerberg committed Mar 26, 2024
1 parent 4d93f21 commit e5f060d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 34 deletions.
67 changes: 36 additions & 31 deletions lib/amqp/client/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ def initialize(connection, id)
@open = false
@on_return = nil
@confirm = nil
@unconfirmed = ::Queue.new
@unconfirmed_empty = ::Queue.new
@unconfirmed = []
@unconfirmed_lock = Mutex.new
@unconfirmed_empty = ConditionVariable.new
@basic_gets = ::Queue.new
end

Expand Down Expand Up @@ -60,7 +61,7 @@ def close(reason: "", code: 200)
expect :channel_close_ok
@replies.close
@basic_gets.close
@unconfirmed_empty.close
@unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
@consumers.each_value(&:close)
nil
end
Expand All @@ -73,7 +74,7 @@ def closed!(level, code, reason, classid, methodid)
@closed = [level, code, reason, classid, methodid]
@replies.close
@basic_gets.close
@unconfirmed_empty.close
@unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
@consumers.each_value(&:close)
@consumers.each_value(&:clear) # empty the queues too, messages can't be acked anymore
nil
Expand Down Expand Up @@ -267,12 +268,15 @@ def basic_publish(body, exchange, routing_key, **properties)
when true then properties[:delivery_mode] = 2
when false then properties[:delivery_mode] = 1
end

if @confirm
@unconfirmed_lock.synchronize do
@unconfirmed.push @confirm += 1
end
end
if body.bytesize.between?(1, body_max)
write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
FrameBytes.header(id, body.bytesize, properties),
FrameBytes.body(id, body)
@unconfirmed.push @confirm += 1 if @confirm
return
end

Expand All @@ -285,7 +289,6 @@ def basic_publish(body, exchange, routing_key, **properties)
write_bytes FrameBytes.body(id, body_part)
pos += len
end
@unconfirmed.push @confirm += 1 if @confirm
nil
end

Expand Down Expand Up @@ -396,42 +399,44 @@ def basic_recover(requeue: false)
# @param no_wait [Boolean] If false the method will block until the broker has confirmed the request
# @return [nil]
def confirm_select(no_wait: false)
return if @confirm
return if @confirm # fast path

write_bytes FrameBytes.confirm_select(@id, no_wait)
expect :confirm_select_ok unless no_wait
@confirm = 0
@unconfirmed_lock.synchronize do
# check again in case another thread already did this while we waited for the lock
return if @confirm

write_bytes FrameBytes.confirm_select(@id, no_wait)
expect :confirm_select_ok unless no_wait
@confirm = 0
end
nil
end

# Block until all publishes messages are confirmed
# @return [Boolean] True if all message where positivly acknowledged, false if not
# @return nil
def wait_for_confirms
return true if @unconfirmed.empty?

ok = @unconfirmed_empty.pop
raise Error::Closed.new(@id, *@closed) if ok.nil?

ok
@unconfirmed_lock.synchronize do
until @unconfirmed.empty?
@unconfirmed_empty.wait(@unconfirmed_lock)
raise Error::Closed.new(@id, *@closed) if @closed
end
end
end

# Called by Connection when received ack/nack from broker
# @api private
def confirm(args)
ack_or_nack, delivery_tag, multiple = *args
loop do
tag = @unconfirmed.pop(true)
break if tag == delivery_tag
next if multiple && tag < delivery_tag

@unconfirmed << tag # requeue
rescue ThreadError
break
_ack_or_nack, delivery_tag, multiple = *args
@unconfirmed_lock.synchronize do
case multiple
when true
idx = @unconfirmed.index(delivery_tag) || raise("Delivery tag not found")
@unconfirmed.shift(idx + 1)
when false
@unconfirmed.delete(delivery_tag) || raise("Delivery tag not found")
end
@unconfirmed_empty.broadcast if @unconfirmed.empty?
end
return unless @unconfirmed.empty?

ok = ack_or_nack == :ack
@unconfirmed_empty.push(ok) until @unconfirmed_empty.num_waiting.zero?
end

# @!endgroup
Expand Down
15 changes: 13 additions & 2 deletions test/amqp/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def test_it_can_select_confirm
channel = connection.channel
channel.confirm_select
channel.basic_publish "foo", "amq.direct", "bar"
assert channel.wait_for_confirms
channel.wait_for_confirms
end

def test_it_can_commit_tx
Expand Down Expand Up @@ -459,11 +459,11 @@ def test_it_can_ack_a_lot_of_msgs
10_000.times do |i|
ch2.basic_publish "bar #{i + 1}", "amq.topic", "foo"
end
ch2.wait_for_confirms

10_000.times do
assert_equal "foo", msgs1.pop.routing_key
end
assert ch2.wait_for_confirms
connection.close
end

Expand Down Expand Up @@ -568,4 +568,15 @@ def test_queue_pruge_returns_msg_count
ensure
connection&.close
end

def test_it_can_publish_with_confirm
connection = AMQP::Client.new("amqp://localhost").connect
channel = connection.channel
q = channel.queue_declare ""
10.times do
channel.basic_publish_confirm "foo", "", q.queue_name
end
q = channel.queue_declare q.queue_name, passive: true
assert_equal 10, q.message_count
end
end
2 changes: 1 addition & 1 deletion test/amqp/tls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def test_it_can_ack_a_lot_of_msgs_on_tls
10_000.times do |i|
ch2.basic_publish "bar #{i + 1}", "amq.topic", "foo"
end
ch2.wait_for_confirms

10_000.times do
assert_equal "foo", msgs1.pop.routing_key
end
assert ch2.wait_for_confirms
connection.close
end
end

0 comments on commit e5f060d

Please sign in to comment.