diff --git a/README.md b/README.md index 04dbb18..136db56 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,22 @@ route_key = "hello" end) ``` +You can also use named connection like this: + +``` elixir +options = %{ + url: "amqp://rabbitmq:5672", + connection_id: :publisher, + exchange: "test-exchange", + routing_key: "test-messages", +} + +Tackle.publish("Hi!", options) +``` + +This example will look for already opened connection with name `:publisher` and reuse it. +If the connection is not found, it will be created. Connections are managed by `Tackle.Connection`. + ## Consuming messages from an exchange First, declare a consumer module: diff --git a/lib/tackle.ex b/lib/tackle.ex index 04f3a1f..949fc1e 100644 --- a/lib/tackle.ex +++ b/lib/tackle.ex @@ -19,31 +19,38 @@ defmodule Tackle do exchange = options[:exchange] routing_key = options[:routing_key] exchange_opts = options[:exchange_opts] || [] + connection_id = options[:connection_id] || :default {_exchange_type, exchange_name} = exchange |> Tackle.Util.parse_exchange() - Logger.debug("Connecting to '#{url}'") - {:ok, connection} = AMQP.Connection.open(url) + {:ok, connection} = Tackle.Connection.open(connection_id, url) channel = Tackle.Channel.create(connection) try do Tackle.Exchange.create(channel, exchange, exchange_opts) Tackle.Exchange.publish(channel, exchange_name, message, routing_key) after - AMQP.Channel.close(channel) - AMQP.Connection.close(connection) + Tackle.Util.cleanup(connection_id, connection, channel) end end def republish(options) do url = options[:url] queue = options[:queue] - exchange = options[:exchange] + exchange_name = options[:exchange] routing_key = options[:routing_key] count = options[:count] || 1 + connection_id = options[:connection_id] || :default + + {:ok, connection} = Tackle.Connection.open(connection_id, url) + channel = Tackle.Channel.create(connection) - Tackle.Republisher.republish(url, queue, exchange, routing_key, count) + try do + Tackle.Republisher.republish(url, queue, exchange_name, routing_key, count) + after + Tackle.Util.cleanup(connection_id, connection, channel) + end end end diff --git a/lib/tackle/connection.ex b/lib/tackle/connection.ex index 1838b6e..ff0b91b 100644 --- a/lib/tackle/connection.ex +++ b/lib/tackle/connection.ex @@ -31,6 +31,19 @@ defmodule Tackle.Connection do open_(name, url) end + def open(url) do + Logger.debug("Connecting to '#{scrub_url(url)}'") + + AMQP.Connection.open(url) + end + + defp scrub_url(url) do + url + |> URI.parse() + |> Map.put(:userinfo, nil) + |> URI.to_string() + end + @doc """ Get a list of opened connections """ @@ -101,6 +114,4 @@ defmodule Tackle.Connection do defp validate_connection_process_rh(_alive? = false, _connection, _name) do {:error, :no_process} end - - def open(url), do: AMQP.Connection.open(url) end diff --git a/lib/tackle/consumer.ex b/lib/tackle/consumer.ex index 4d3dc3d..fc1b146 100644 --- a/lib/tackle/consumer.ex +++ b/lib/tackle/consumer.ex @@ -134,7 +134,8 @@ defmodule Tackle.Consumer do delay_queue: delay_queue, dead_queue: dead_queue, retry_limit: retry_limit, - consumer_tag: consumer_tag + consumer_tag: consumer_tag, + connection_id: connection_id } {:ok, state} @@ -189,6 +190,7 @@ defmodule Tackle.Consumer do retry_count = Tackle.DelayedRetry.retry_count_from_headers(headers) options = [ + connection_id: state.connection_id, persistent: true, headers: [ retry_count: retry_count + 1 diff --git a/lib/tackle/delayed_retry.ex b/lib/tackle/delayed_retry.ex index dbb5ecd..d4616a4 100644 --- a/lib/tackle/delayed_retry.ex +++ b/lib/tackle/delayed_retry.ex @@ -8,16 +8,15 @@ defmodule Tackle.DelayedRetry do def retry_count_from_headers([_ | tail]), do: retry_count_from_headers(tail) def publish(url, queue, payload, options) do - Logger.info("Connecting to '#{url}'") + connection_id = options[:connection_id] || :default - {:ok, connection} = AMQP.Connection.open(url) + {:ok, connection} = Tackle.Connection.open(connection_id, url) {:ok, channel} = Channel.open(connection) try do :ok = AMQP.Basic.publish(channel, "", queue, payload, options) after - AMQP.Channel.close(channel) - AMQP.Connection.close(connection) + Tackle.Util.cleanup(connection_id, connection, channel) end end end diff --git a/lib/tackle/republisher.ex b/lib/tackle/republisher.ex index e24d51e..2438d71 100644 --- a/lib/tackle/republisher.ex +++ b/lib/tackle/republisher.ex @@ -2,22 +2,26 @@ defmodule Tackle.Republisher do use AMQP require Logger - def republish(url, queue, exchange, routing_key, count) do - Logger.info("Connecting to '#{url}'") - {:ok, connection} = AMQP.Connection.open(url) + @deprecated "Use Tackle.republish/1 instead" + def republish(url, queue, exchange, routing_key, count) when is_binary(url) do + connection_id = :default + {:ok, connection} = Tackle.Connection.open(connection_id, url) channel = Tackle.Channel.create(connection) try do - 0..(count - 1) - |> Enum.each(fn idx -> - republish_one_message(channel, queue, exchange, routing_key, idx) - end) + republish(channel, queue, exchange, routing_key, count) after - AMQP.Channel.close(channel) - AMQP.Connection.close(connection) + Tackle.Util.cleanup(connection_id, connection, channel) end end + def republish(channel, queue, exchange, routing_key, count) do + 0..(count - 1) + |> Enum.each(fn idx -> + republish_one_message(channel, queue, exchange, routing_key, idx) + end) + end + defp republish_one_message(channel, queue, exchange, routing_key, idx) do Logger.info("(#{idx}) Fetching message... from '#{inspect(queue)}' queue") diff --git a/lib/tackle/util.ex b/lib/tackle/util.ex index 5f500d9..0427ba7 100644 --- a/lib/tackle/util.ex +++ b/lib/tackle/util.ex @@ -7,4 +7,13 @@ defmodule Tackle.Util do name -> {:direct, name} end end + + def cleanup(:default, connection, channel) do + AMQP.Channel.close(channel) + AMQP.Connection.close(connection) + end + + def cleanup(_, _, channel) do + AMQP.Channel.close(channel) + end end diff --git a/test/integration/republish_test.exs b/test/integration/republish_test.exs index 61066d3..af2d286 100644 --- a/test/integration/republish_test.exs +++ b/test/integration/republish_test.exs @@ -88,7 +88,7 @@ defmodule Tackle.RepublishTest do # MessageTrace.clear("fixed-service") - {:ok, _} = FixedConsumer.start_link() + {:ok, fixed_consumer} = FixedConsumer.start_link() :timer.sleep(1000) Tackle.republish(%{ @@ -99,13 +99,10 @@ defmodule Tackle.RepublishTest do count: 2 }) + GenServer.stop(fixed_consumer) :timer.sleep(2000) end - # Since bumping the `amqp` dependency from 1.1.0 - the process is not connecting fast enough to the queue. - # This causes the test to fail. I'm not sure why this is happening, but I'm skipping the test for now. - @tag :skip - @tag :fixme test "consumes only two messages" do assert MessageTrace.content("fixed-service") == "Hi there!" end