Skip to content

Commit

Permalink
Expose Connection#blocked? but don't block socket write on blocked
Browse files Browse the repository at this point in the history
If the server sends Connection#Blocked frame the client should stop
publishing, but not be prevented to do other actions, like consuming
messages as that might eliminate the resource problem on the server side.
  • Loading branch information
carlhoerberg committed Mar 26, 2024
1 parent b9e78f8 commit aaa9bdb
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 54 deletions.
12 changes: 10 additions & 2 deletions lib/amqp/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ def initialize(uri = "", read_loop_thread: true, **options)
Thread.new { read_loop } if read_loop_thread
end

# Indicates that the server is blocking publishes.
# If the client keeps publishing the server will stop reading from the socket.
# Use the #on_blocked callback to get notified when the server is resource constrained.
# @see #on_blocked
# @see #on_unblocked
# @return [Bool]
def blocked?
!@blocked.nil?
end

# Alias for {#initialize}
# @see #initialize
# @deprecated
Expand Down Expand Up @@ -241,10 +251,8 @@ def parse_frame(type, channel_id, buf) # rubocop:disable Metrics/MethodLength
reason_len = buf.getbyte(4)
reason = buf.byteslice(5, reason_len).force_encoding("utf-8")
@blocked = reason
@write_lock.lock
@on_blocked.call(reason)
when 61 # connection#unblocked
@write_lock.unlock
@blocked = nil
@on_unblocked.call
else raise Error::UnsupportedMethodFrame, class_id, method_id
Expand Down
82 changes: 30 additions & 52 deletions test/amqp/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -479,46 +479,20 @@ def test_it_can_set_channel_max

def test_it_can_be_blocked
skip_if_no_sudo
connection = AMQP::Client.new("amqp://localhost").connect
ch = connection.channel
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.001")
t = Thread.new do
begin
connection = AMQP::Client.new("amqp://localhost").connect
ch = connection.channel
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.001")
ch.basic_publish("body", "", "q")
sleep 0.01 # server blocks after first publish
ch.basic_publish("body", "", "q")
end
assert_nil t.join(0.1) # make sure the thread is blocked
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.4")
assert t.join
refute t.status # status is false when terminated normal
ensure
connection&.close
end

def test_it_will_raise_if_closed_while_blocked
skip_if_no_sudo
connection = AMQP::Client.new("amqp://localhost").connect
ch = connection.channel
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.001")
t = Thread.new do
ch.basic_publish("body", "", "q")
assert connection.blocked?
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.4")
sleep 0.01 # server blocks after first publish
assert_raises(AMQP::Client::Error::ConnectionClosed) do
ch.basic_publish("body", "", "q")
end
refute connection.blocked?
ensure
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.4")
connection&.close
end
assert_nil t.join(0.1) # make sure the thread is blocked
Thread.new do
assert_raises(AMQP::Client::Error::ConnectionClosed) do
ch.exchange_declare("foo", "not.an.exchange.type")
end
end
sleep 0.01
connection.close
t.join
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.4")
ensure
connection&.close
end

def test_it_will_publish_and_consume_properties
Expand Down Expand Up @@ -559,23 +533,27 @@ def test_it_will_publish_and_consume_properties

def test_blocked_handler
skip_if_no_sudo
q = Queue.new
client = AMQP::Client.new("amqp://localhost")
connection = client.connect
connection.on_blocked do |reason|
q << reason
end
connection.on_unblocked do
q << nil
begin
q = Queue.new
client = AMQP::Client.new("amqp://localhost")
connection = client.connect
connection.on_blocked do |reason|
q << reason
end
connection.on_unblocked do
q << nil
end
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.001")
ch = connection.channel
ch.basic_publish("", "", "")
reason = q.pop
assert_equal "low on memory", reason
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.4")
unblocked = q.pop
assert_nil unblocked
ensure
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.4")
end
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.001")
ch = connection.channel
ch.basic_publish("", "", "")
reason = q.pop
assert_equal "low on memory", reason
system("sudo rabbitmqctl set_vm_memory_high_watermark 0.4")
unblocked = q.pop
assert_nil unblocked
end

def test_queue_pruge_returns_msg_count
Expand Down

0 comments on commit aaa9bdb

Please sign in to comment.