From ce03aa37372c3aaa9946fddcf3f04218e9905bb6 Mon Sep 17 00:00:00 2001 From: James Ludlow Date: Fri, 31 May 2019 12:34:38 -0500 Subject: [PATCH] Allow `prefetch_count` when consuming a channel. When starting up a consumer process this adds support for setting a prefetch on the channel. For example: children = [ {EchoConsumer, [pool_id: :consumers_pool, queue: "echo_queue", options: [prefetch_count: 1]]}, --- lib/clients/rabbitmq.ex | 10 +++++++++- lib/consumer.ex | 2 +- test/integration/consumer_test.exs | 23 ++++++++++++++--------- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/lib/clients/rabbitmq.ex b/lib/clients/rabbitmq.ex index 3098e2d..bd7d2a3 100644 --- a/lib/clients/rabbitmq.ex +++ b/lib/clients/rabbitmq.ex @@ -11,7 +11,15 @@ defmodule ExRabbitPool.RabbitMQ do end @impl true - def consume(%Channel{} = channel, queue, consumer_pid \\ nil, options \\ []) do + def consume(channel, queue, consumer_pid \\ nil, options \\ []) + + def consume(%Channel{} = channel, queue, consumer_pid, [prefetch_count: prefetch_count] = options) do + Logger.warn("[ExRabbitPool.RabbitMQ.consume] queue: #{inspect queue} setting prefetch_count to #{prefetch_count}") + :ok = Basic.qos(channel, prefetch_count: prefetch_count) + Basic.consume(channel, queue, consumer_pid, options) + end + + def consume(%Channel{} = channel, queue, consumer_pid, options) do Basic.consume(channel, queue, consumer_pid, options) end diff --git a/lib/consumer.ex b/lib/consumer.ex index e6b9fb3..bc80553 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -166,7 +166,7 @@ defmodule ExRabbitPool.Consumer do # process and monitors it handle crashes and reconnections defp handle_channel_checkout( {:ok, %{pid: channel_pid} = channel}, - %{config: config, queue: queue, adapter: adapter, config: config} = state + %{config: config, queue: queue, adapter: adapter} = state ) do config = Keyword.get(config, :options, []) diff --git a/test/integration/consumer_test.exs b/test/integration/consumer_test.exs index 0071b4f..10c440b 100644 --- a/test/integration/consumer_test.exs +++ b/test/integration/consumer_test.exs @@ -1,6 +1,7 @@ defmodule ExRabbitPool.ConsumerTest do use ExUnit.Case, async: false + import ExUnit.CaptureLog alias ExRabbitPool.Worker.SetupQueue alias ExRabbitPool.RabbitMQ alias AMQP.Queue @@ -86,15 +87,19 @@ defmodule ExRabbitPool.ConsumerTest do end test "should be able to consume messages out of rabbitmq", %{pool_id: pool_id, queue: queue} do - pid = start_supervised!({TestConsumer, pool_id: pool_id, queue: queue}) - :erlang.trace(pid, true, [:receive]) - - ExRabbitPool.with_channel(pool_id, fn {:ok, channel} -> - assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer!") - assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer!", _}}, 1000 - {:ok, result} = Queue.status(channel, queue) - assert result == %{consumer_count: 1, message_count: 0, queue: queue} - end) + logs = + capture_log(fn -> + pid = start_supervised!({TestConsumer, pool_id: pool_id, queue: queue, options: [prefetch_count: 19]}) + :erlang.trace(pid, true, [:receive]) + + ExRabbitPool.with_channel(pool_id, fn {:ok, channel} -> + assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer!") + assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer!", _}}, 1000 + {:ok, result} = Queue.status(channel, queue) + assert result == %{consumer_count: 1, message_count: 0, queue: queue} + end) + end) + assert logs =~ "[ExRabbitPool.RabbitMQ.consume] queue: #{inspect queue} setting prefetch_count to 19" end test "should be able to consume messages out of rabbitmq with default consumer", %{