Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Latest commit

 

History

History
48 lines (38 loc) · 1.72 KB

with_custom_queue_configuration.md

File metadata and controls

48 lines (38 loc) · 1.72 KB

Consumer with custom queue configuration

queue_options - Queue options as declared in AMQP.Queue.declare/3.

Example

defmodule ConsumerWithCustomQueueConfiguration do
  @behaviour GenRMQ.Consumer

  def init() do
    [
      connection: "amqp://guest:guest@localhost:5672",
      queue: "example_queue",
      exchange: "example_exchange",
      routing_key: "routing_key.#",
      prefetch_count: "10",
      queue_options: [
        durable: false,
        arguments: [
          {"x-expires", :long, 3_600_000},
          {"x-max-priority", :long, 3}
        ]
      ]
    ]
  end

  def handle_message(%GenRMQ.Message{} = message), do: GenRMQ.Consumer.ack(message)

  def handle_error(%GenRMQ.Message{} = message, _reason), do: GenRMQ.Consumer.reject(message, false)

  def consumer_tag(), do: "consumer-tag"

  def start_link(), do: GenRMQ.Consumer.start_link(__MODULE__, name: __MODULE__)
end

Outcome:

  • durable example_exchange.deadletter exchange created or redeclared
  • durable example_queue_error queue created or redeclared and bound to example_exchange.deadletter exchange
  • durable topic example_exchange exchange created or redeclared
  • transient, priority and with ttl example_queue queue created or redeclared and bound to example_exchange exchange
  • queue example_queue has a deadletter exchange set to example_exchange.deadletter
  • every handle_message callback will be executed in a separate supervised task. If the task fails handle_error/2 will be called
  • on failed rabbitmq connection it will wait for a bit and then reconnect