Skip to content

Latest commit

 

History

History
601 lines (497 loc) · 29.8 KB

GUIDE.md

File metadata and controls

601 lines (497 loc) · 29.8 KB

Mule AMQP Transport - User Guide

Welcome to AMQP

The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security (Wikipedia). The specifications are available on-line and several broker implementations exist, like the very popular VMware RabbitMQ and Apache Qpid.

AMQP is built around a few easy to grasp concepts:

  • Clients connect via channels to AMQP brokers in order to send or receive messages,
  • They can publish messages to exchanges,
  • Messages published to exchanges a routed to queues where they get accumulated for later consuming.
  • The queue that will constitute the final destination of a message is not known by the message publisher: it is determined by the type of the exchange and a piece of meta information known as the "routing key".
  • It is possible for a message to end-up nowhere if no queue has been bound to the targeted exchange or if no routing rules haven't matched any existing queue.
  • There are four main types of exchanges: direct, fanout, topic and headers.
  • Clients interested in consuming messages must create queues and bind these queues to exchanges.
  • Queue and exchange declaration is an idempotent operation hence it is common practice to declare them on each client startup.

** AMQP for the JMS savvy **

If you're a Java developer, chances are you have been exposed to JMS and are wondering how AMQP differs from JMS.

In a nutshell, the main differences are the following:

  • AMQP defines both an API and a wire-format, ensuring compatibility between implementations (JMS only defines an API),

  • In JMS you publish directly to destinations (queues or topic) while in AMQP you publish to exchanges to which queues are bound (or not), which decouples the producer from the final destination of its messages.

  • For some types of exchanges, the delivery to the final destination depends on a routing key, a simple string that provides the necessary meta-information for successfully routing the message (unlike in JMS where the name of the destination is all it's needed).

Core Transport Principles

The Mule AMQP Transport is an abstraction built on top of the previously introduced AMQP constructs: connection, channel, exchanges, queues and messages.

The transport hides the low level concepts, like dealing with channels, but gives a great deal of control on all the constructs it encapsulates allowing you to experience the richness of AMQP without the need to code to its API.

Here is a quick review of the main configuration elements you'll deal with when using the transport:

  • The connector element take care of establishing the connection to AMQP brokers, deals with channels and manages a set of common properties that will be shared by all consumers or publishers that will use this connector.
  • The inbound endpoint elements are in charge of consuming messages from AMQP queues and route them to your components, transformers, routers or other outbound endpoints as defined in your Mule configuration.
  • The outbound endpoint elements are in charge of publishing messages to AMQP exchanges from your Mule configuration.

Message payload and properties

The AMQP transport works with another abstraction that is very important to understand: the Mule Message. A Mule Message is a transport agnostic abstraction that encapsulates a payload and meta-information named properties: this allows your different configuration element to deal with messages while being oblivious to the source or destination of such messages.

An AMQP message also has the notion of a payload (in bytes) and message properties, which are composed of a set of pre-defined ones (know as basic properties) and any additional custom ones. Moreover, when a message is delivered, extra properties, known as envelope properties, are also available.

The AMQPtransport will create Mule Messages with byte[] payloads for inbound messages and will rely on Mule's auto transformation infrastructure to extract byte[] payloads from Mule Messages for outbound messages. Should you need to use a particular payload representation (for example XML or JSON), it is up to you to add the necessary transformers to perform the desired serialization/deserialization steps.

The transport also takes care of making the properties of inbound messages available as standard Mule Message properties and, conversely, converting properties of Mule Messages into AMQP properties for outbound messages.

Here is the list of properties supported by the transport:

Basic PropertiesEnvelope PropertiesTechnical Properties
app-iddelivery-tagconsumer-tag
content-encodingexchangeamqp.channel
content-typeredeliveredamqp.return.listener
correlation-idrouting-keyreturn.reply-code
delivery_modereturn.reply-text
expirationreturn.exchange
message-idreturn.routing-key
priority
reply-to
timestamp
type
user-id

On top of that all custom headers defined in the AMQP basic properties are added as standard Mule properties.

Configuration Reference

All the configuration parameters supported by the connector and endpoint configuration elements are described in this section.

Connector Attributes

The AMQP connector defines what broker to connect to, which credentials to use when doing so and all the common properties used by the inbound and outbound endpoints using this connector.

It is possible to create several connectors connected to the same broker for the purpose of having different sets of common properties that the endpoints will use.

NameTypeRequiredDefaultDescription
hoststringnolocalhost

The main AMQP broker host to connect to.

portport numberno5672

The port to use to connect to the main AMQP broker.

fallbackAddressesstringno

A comma-separated list of "host:port" or "host", defining fallback brokers to attempt connection to, should the connection to main broker fail.

virtualHoststringno/

The virtual host to connect to on the AMQP broker.

usernamestringnoguest

The user name to use to connect to the AMQP broker.

passwordstringnoguest

The password to use to connect to the AMQP broker.

deliveryModePERSISTENT / NON_PERSISTENTnoPERSISTENT

The delivery mode to use when publishing to the AMQP broker.

priorityno0

The priority to use when publishing to the AMQP broker.

mandatorybooleannofalse

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set to true, the server will throw an exception for any unroutable message. If this flag is false, the server silently drops the message.

immediatebooleannofalse

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set to true, the server will throw an exception for any undeliverable message. If this flag is false, the server will queue the message, but with no guarantee that it will ever be consumed.

default-return-endpoint-refstringno

Reference to an endpoint to which AMQP returned message should be dispatched to.

ackModeAMQP_AUTO / MULE_AUTO / MANUALnoAMQP_AUTO

The acknowledgment mode to use when consuming from the AMQP broker.

prefetchSizeintegerno0

The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited.

prefetchCountintegerno0

The maximum number of messages that the server will deliver, 0 if unlimited.

noLocalbooleannofalse

If the no-local field is set the server will not send messages to the connection that published them.

exclusiveConsumersbooleannofalse

Set to true if the connector should only create exclusive consumers.

activeDeclarationsOnlybooleannofalse

Defines if the connector should only do active exchange and queue declarations or can also perform passive declarations to enforce their existence.

Endpoint Attributes

Endpoint attributes are interpreted differently if they are used on inbound or outbound endpoints. For example, routingKey on an inbound endpoint is meant to be used for queue binding while it is used as basic publish parameter on outbound endpoints.

NameTypeRequiredDefaultDescription
exchangeNamestringno

The exchange to publish to or bind queues to. Leave blank or omit for the default exchange.

queueNamestringno

The queue name to consume from. Leave blank or omit for using a new private exclusive server-named queue.

routingKeystringno

The routing key to use when binding a queue or publishing a message.

consumerTagstringno

A client-generated consumer tag to establish context.

exchangeTypefanout / direct / topic / headersno

The type of exchange to be declared.

exchangeDurablebooleanno

The durability of the declared exchange.

exchangeAutoDeletebooleanno

Specifies if the declared exchange should be autodeleted.

queueDurablebooleanno

Specifies if the declared queue is durable.

queueAutoDeletebooleanno

Specifies if the declared queue should be autodeleted.

queueExclusivebooleanno

Specifies if the declared queue is exclusive.

Examples

There are many ways to use the AMQP connector and endpoints. The following examples will demonstrate the common use cases.

Connection fallback

It is possible to define a list of host:port or host (implying default port) to try to connect to in case the main one fails to connect.

<amqp:connector name="amqpConnectorWithFallback"
                host="rabbit1"
                port="9876"
                fallbackAddresses="rabbit1:9875,rabbit2:5672,rabbit3"
                virtualHost="mule-test"
                username="my-user"
                password="my-pwd" />

Listen to messages with exchange re-declaration and queue creation

This is a typical AMQP pattern where consumers redeclare the exchanges they intend to bind queues to.

<amqp:connector name="amqpAutoAckLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="true" />

<amqp:inbound-endpoint exchangeName="my-exchange"
                       exchangeType="fanout"
                       exchangeAutoDelete="false"
                       exchangeDurable="true"
                       queueName="my-queue"
                       queueDurable="false"
                       queueExclusive="false"
                       queueAutoDelete="true"
                       connector-ref="amqpAutoAckLocalhostConnector" />

Listen to messages with exchange re-declaration and private queue creation

In this variation of the previous example, Mule will create an exclusive server-named, auto-delete, non-durable queue and bind it to the re-declared exchange.

<amqp:connector name="amqpAutoAckLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="true" />

<amqp:inbound-endpoint exchangeName="my-exchange"
                       exchangeType="fanout"
                       exchangeAutoDelete="false"
                       exchangeDurable="true"
                       connector-ref="amqpAutoAckLocalhostConnector" />

Listen to messages on a pre-existing exchange

In this mode, the inbound connection will fail if the exchange doesn't pre-exist.

This behavior is enforced by activeDeclarationsOnly=false, which means that Mule will strictly ensure the pre-existence of the exchange before trying to subscribe to it.

<amqp:connector name="amqpAutoAckStrictLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="false" />
                
<amqp:inbound-endpoint exchangeName="my-exchange"
                       queueName="my-queue"
                       queueDurable="false"
                       queueExclusive="false"
                       queueAutoDelete="true"
                       queueName="my-queue"
                       connector-ref="amqpAutoAckStrictLocalhostConnector" />

Listen to messages on a pre-existing queue

Similarly to the previous example, the inbound connection will fail if the queue doesn't pre-exist.

<amqp:connector name="amqpAutoAckStrictLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="false" />
                
<amqp:inbound-endpoint queueName="my-queue"
                       connector-ref="amqpAutoAckStrictLocalhostConnector" />

Manual message acknowledgement and rejection

So far, all incoming messages were automatically acknowledged by the AMQP client.

The following example shows how to manually acknowledge or reject messages within a flow, based on criteria of your choice.

<amqp:connector name="amqpManualAckLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                ackMode="MANUAL" />

<flow name="amqpChoiceAckNackService">
  <amqp:inbound-endpoint queueName="my-queue"
                         connector-ref="amqpManualAckLocalhostConnector" />
  <choice>
    <when ...condition...>
      <amqp:acknowledge-message />
    </when>
    <otherwise>
      <amqp:reject-message requeue="true" />
    </otherwise>
  </choice>
</flow>

Flow control

Expanding on the previous example, it is possible to throttle the delivery of messages by configuring the connector accordingly.

The following demonstrates a connector that fetches messages one by one and a flow that uses manual acknowledgment to throttle the message delivery.

<amqp:connector name="amqpThrottledConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                prefetchCount="1"
                ackMode="MANUAL" />

<flow name="amqpManualAckService">
  <amqp:inbound-endpoint queueName="my-queue"
                         connector-ref="amqpThrottledConnector" />
  <!--
  components, routers... go here
  -->
  <amqp:acknowledge-message />
</flow>

Publish messages to a redeclared exchange

This is a typical AMQP pattern where producers redeclare the exchanges they intend to publish to.

<amqp:connector name="amqpLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="true" />

<amqp:outbound-endpoint routingKey="my-key"
                        exchangeName="my-exchange"
                        exchangeType="fanout"
                        exchangeAutoDelete="false"
                        exchangeDurable="false"
                        connector-ref="amqpLocalhostConnector" />

Publish messages to a pre-existing exchange

It is also possible to publish to a pre-existing exchange:

<amqp:outbound-endpoint exchangeName="my-exchange"
                        connector-ref="amqpLocalhostConnector" />

It can be desirable to strictly enforce the existence of this exchange before publishing to it.

This is done by configuring the connector to perform passive declarations:

<amqp:connector name="amqpStrictLocalhostConnector"
                virtualHost="my-vhost"
                username="my-user"
                password="my-pwd"
                activeDeclarationsOnly="false" />
                
<amqp:outbound-endpoint routingKey="my-key"
                        exchangeName="my-exchange"
                        connector-ref="amqpStrictLocalhostConnector" />

Message level override of exchange and routing key

It is possible to override some outbound endpoint attributes with outbound-scoped message properties:

  • routing-key overrides the routingKey attribute,
  • exchange overrides the exchangeName attribute.

Mandatory and immediate deliveries and returned message handling

The connector supports the mandatory and immediate publication flags, as show hereafter:

<amqp:connector name="mandatoryAmqpConnector" virtualHost="mule-test" username="mule" password="elum" mandatory="true" immediate="true" />

If a message sent with this connector can't be delivered, the AMQP broker will return it asynchronously.

The AMQP transport offers the possibility to dispatch these returned messages to user defined endpoints for custom processing.

You can define the endpoint in charge of handling returned messages at connector level. Here is an example that targets a VM endpoint:

<vm:endpoint name="globalReturnedMessageChannel" path="global.returnedMessages" />

<amqp:connector name="mandatoryAmqpConnector" virtualHost="mule-test" username="mule" password="elum" mandatory="true" default-return-endpoint-ref="globalReturnedMessageChannel" />

It is also possible to define the returned message endpoint at flow level:

<vm:endpoint name="flowReturnedMessageChannel" path="flow.returnedMessages" />

<flow name="amqpMandatoryDeliveryFailureFlowHandler">
  <!--
  inbound endpoint, components, routers ...
  -->

  <amqp:return-handler>
    <vm:outbound-endpoint ref="flowReturnedMessageChannel" />
  </amqp:return-handler>

  <amqp:outbound-endpoint routingKey="my-key"
                          exchangeName="my-exchange"
                          connector-ref="mandatoryAmqpConnector" />
</flow>

If both are configured, the handler defined in the flow will supersede the one defined in the connector.

If none is configured, Mule will log a warning with the full details of the returned message.

Request-response publication

It is possible to perform synchronous (request-response) outbound operations:

<amqp:outbound-endpoint routingKey="my-key"
                        exchange-pattern="request-response"
                        exchangeName="my-exchange"
                        connector-ref="amqpLocalhostConnector" />

In that case, Mule will:

  • create a temporary auto-delete private reply queue,
  • set-it as the reply-to property of the current message,
  • publish the message to the specified exchange,
  • wait for a response to be sent to the reply-queue (via the default exchange).

Programmatic message requesting

It is possible to programmatically get messages from an AMQP queue.

For this you need first to build a URI that identifies the AMQP queue that you want to consume from. Here is the syntax to use, with optional parameters in square brackets:

amqp://[${exchangeName}/]amqp-queue.${queueName}[?connector=${connectorName}[&...other parameters...]]

For example, the following identifies a prexisting queue named "my-queue" and will consume it with a unique AMQP connector available in the Mule configuration:

amqp://amqp-queue.my-queue

This example will create and bind a non-durable auto-delete non-exclusive queue named "new-queue" to a pre-existing exchange named "my-exchange" with the provided routing key on the specified connector:

amqp://my-exchange/amqp-queue.new-queue?connector=amqpAutoAckLocalhostConnector&queueDurable=false&queueExclusive=false&queueAutoDelete=true

With such a URI defined, it is possible to retrieve a message from the queue using the Mule Client, as shown in the following code sample:

MuleMessage message = new MuleClient(muleContext).request("amqp://amqp-queue.my-queue", 2500L);

The above will wait for 2.5 seconds for a message and will return null if none has shown up in the queue after this amount of time.