Consumer with custom deadletter:
- queue name
- exchange name
- routing key
- queue arguments and type (transient instead of durable)
deadletter_queue_options
- Queue options for the deadletter queue as declared in
AMQP.Queue.declare/3.
defmodule ConsumerWithCustomDeadletterConfiguration do
@behaviour GenRMQ.Consumer
def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "example_queue",
exchange: "example_exchange",
routing_key: "routing_key.#",
prefetch_count: "10",
deadletter_queue: "custom_deadletter_queue",
deadletter_exchange: "custom_deadletter_exchange",
deadletter_routing_key: "custom_deadletter_routing_key",
deadletter_queue_options: [
durable: false,
arguments: [
{"x-expires", :long, 3_600_000},
{"x-max-priority", :long, 3}
]
]
]
end
def handle_message(%GenRMQ.Message{} = message), do: GenRMQ.Consumer.ack(message)
def handle_error(%GenRMQ.Message{} = message, _reason), do: GenRMQ.Consumer.reject(message, false)
def consumer_tag(), do: "consumer-tag"
def start_link(), do: GenRMQ.Consumer.start_link(__MODULE__, name: __MODULE__)
end
- durable
custom_deadletter_exchange
exchange created or redeclared - transient, priority and with ttl
custom_deadletter_queue
queue created or redeclared and bound tocustom_deadletter_exchange
exchange - durable topic
example_exchange
exchange created or redeclared - durable
example_queue
queue created or redeclared and bound toexample_exchange
exchange - queue
example_queue
has a deadletter exchange set tocustom_deadletter_exchange
- every
handle_message
callback will be executed in a separate supervised task. If the task failshandle_error/2
will be called - on failed rabbitmq connection it will wait for a bit and then reconnect