From e5f060dbc62cf3362b6ac3f88ce4fe44e31513fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 24 Mar 2024 22:51:29 +0100 Subject: [PATCH] Channel#wait_for_confirms/confirm_select thread safety Use a ConditionalVariable to make wait_for_confirms truely thread safe. Breaking change is that wait_for_confirms now returns nil instead of true/false. confirm_select also made thread safe --- lib/amqp/client/channel.rb | 67 ++++++++++++++++++++------------------ test/amqp/client_test.rb | 15 +++++++-- test/amqp/tls_test.rb | 2 +- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/lib/amqp/client/channel.rb b/lib/amqp/client/channel.rb index b295bb9..e776812 100644 --- a/lib/amqp/client/channel.rb +++ b/lib/amqp/client/channel.rb @@ -22,8 +22,9 @@ def initialize(connection, id) @open = false @on_return = nil @confirm = nil - @unconfirmed = ::Queue.new - @unconfirmed_empty = ::Queue.new + @unconfirmed = [] + @unconfirmed_lock = Mutex.new + @unconfirmed_empty = ConditionVariable.new @basic_gets = ::Queue.new end @@ -60,7 +61,7 @@ def close(reason: "", code: 200) expect :channel_close_ok @replies.close @basic_gets.close - @unconfirmed_empty.close + @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value(&:close) nil end @@ -73,7 +74,7 @@ def closed!(level, code, reason, classid, methodid) @closed = [level, code, reason, classid, methodid] @replies.close @basic_gets.close - @unconfirmed_empty.close + @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value(&:close) @consumers.each_value(&:clear) # empty the queues too, messages can't be acked anymore nil @@ -267,12 +268,15 @@ def basic_publish(body, exchange, routing_key, **properties) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end - + if @confirm + @unconfirmed_lock.synchronize do + @unconfirmed.push @confirm += 1 + end + end if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) - @unconfirmed.push @confirm += 1 if @confirm return end @@ -285,7 +289,6 @@ def basic_publish(body, exchange, routing_key, **properties) write_bytes FrameBytes.body(id, body_part) pos += len end - @unconfirmed.push @confirm += 1 if @confirm nil end @@ -396,42 +399,44 @@ def basic_recover(requeue: false) # @param no_wait [Boolean] If false the method will block until the broker has confirmed the request # @return [nil] def confirm_select(no_wait: false) - return if @confirm + return if @confirm # fast path - write_bytes FrameBytes.confirm_select(@id, no_wait) - expect :confirm_select_ok unless no_wait - @confirm = 0 + @unconfirmed_lock.synchronize do + # check again in case another thread already did this while we waited for the lock + return if @confirm + + write_bytes FrameBytes.confirm_select(@id, no_wait) + expect :confirm_select_ok unless no_wait + @confirm = 0 + end nil end # Block until all publishes messages are confirmed - # @return [Boolean] True if all message where positivly acknowledged, false if not + # @return nil def wait_for_confirms - return true if @unconfirmed.empty? - - ok = @unconfirmed_empty.pop - raise Error::Closed.new(@id, *@closed) if ok.nil? - - ok + @unconfirmed_lock.synchronize do + until @unconfirmed.empty? + @unconfirmed_empty.wait(@unconfirmed_lock) + raise Error::Closed.new(@id, *@closed) if @closed + end + end end # Called by Connection when received ack/nack from broker # @api private def confirm(args) - ack_or_nack, delivery_tag, multiple = *args - loop do - tag = @unconfirmed.pop(true) - break if tag == delivery_tag - next if multiple && tag < delivery_tag - - @unconfirmed << tag # requeue - rescue ThreadError - break + _ack_or_nack, delivery_tag, multiple = *args + @unconfirmed_lock.synchronize do + case multiple + when true + idx = @unconfirmed.index(delivery_tag) || raise("Delivery tag not found") + @unconfirmed.shift(idx + 1) + when false + @unconfirmed.delete(delivery_tag) || raise("Delivery tag not found") + end + @unconfirmed_empty.broadcast if @unconfirmed.empty? end - return unless @unconfirmed.empty? - - ok = ack_or_nack == :ack - @unconfirmed_empty.push(ok) until @unconfirmed_empty.num_waiting.zero? end # @!endgroup diff --git a/test/amqp/client_test.rb b/test/amqp/client_test.rb index a0ed417..d5bcee5 100644 --- a/test/amqp/client_test.rb +++ b/test/amqp/client_test.rb @@ -326,7 +326,7 @@ def test_it_can_select_confirm channel = connection.channel channel.confirm_select channel.basic_publish "foo", "amq.direct", "bar" - assert channel.wait_for_confirms + channel.wait_for_confirms end def test_it_can_commit_tx @@ -459,11 +459,11 @@ def test_it_can_ack_a_lot_of_msgs 10_000.times do |i| ch2.basic_publish "bar #{i + 1}", "amq.topic", "foo" end + ch2.wait_for_confirms 10_000.times do assert_equal "foo", msgs1.pop.routing_key end - assert ch2.wait_for_confirms connection.close end @@ -568,4 +568,15 @@ def test_queue_pruge_returns_msg_count ensure connection&.close end + + def test_it_can_publish_with_confirm + connection = AMQP::Client.new("amqp://localhost").connect + channel = connection.channel + q = channel.queue_declare "" + 10.times do + channel.basic_publish_confirm "foo", "", q.queue_name + end + q = channel.queue_declare q.queue_name, passive: true + assert_equal 10, q.message_count + end end diff --git a/test/amqp/tls_test.rb b/test/amqp/tls_test.rb index 58862dc..63f93c8 100644 --- a/test/amqp/tls_test.rb +++ b/test/amqp/tls_test.rb @@ -31,11 +31,11 @@ def test_it_can_ack_a_lot_of_msgs_on_tls 10_000.times do |i| ch2.basic_publish "bar #{i + 1}", "amq.topic", "foo" end + ch2.wait_for_confirms 10_000.times do assert_equal "foo", msgs1.pop.routing_key end - assert ch2.wait_for_confirms connection.close end end