Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add amqp exchange routing pattern and ref in channel operation #259

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

timonback
Copy link

Description

In Java Spring applications, it is possible to define an application that connects to an amqp broker, defines exchanges, queues and listens to incoming message on these queues.

My current understanding of amqp:

  1. A publisher connects to a broker
  2. The publisher sends a message with a routing key to an exchange
  3. The broker matches the message's routing key with the routing patterns of the defined exchanges and forwards the message to those queues.
  4. (Bindings are used to connect an exchange with a queue, which do have a n-to-m mapping)
  5. The consumer receives message from the queue

In AsyncAPI, the amqp binding supports two types of channels: routingKey (which is an exchange) and queue.
Unfortunately, there is no link between the two.

Proposal:
Add (optional) fields:

  1. name to specify the actual routing pattern used (in the (topic) exchange or amqp binding)
  2. channel.$ref to connect the is=routingKey channel type to the is=queue channel type.

I am looking forward to your thoughts.

Example:

asyncapi: 3.0.0
info:
  title: Springwolf example project - AMQP
  version: 1.0.0
  description: Springwolf example project to demonstrate springwolfs abilities
defaultContentType: application/json
servers:
  amqp-server:
    host: amqp:5672
    protocol: amqp
channels:
  queue-update-id_#_CRUD-topic-exchange-1-id:
    address: CRUD-topic-exchange-1
    messages:
      java.lang.String:
        $ref: "#/components/messages/java.lang.String"
    bindings:
      amqp:
        is: routingKey
        exchange:
          name: CRUD-topic-exchange-1
          type: topic
          durable: true
          autoDelete: false
          vhost: /
        bindingVersion: 0.3.0
  queue-update_id:
    address: queue-update
    bindings:
      amqp:
        is: queue
        queue:
          name: queue-update
          durable: false
          exclusive: false
          autoDelete: false
          vhost: /
        bindingVersion: 0.3.0
components:
  schemas:
    SpringRabbitListenerDefaultHeaders:
      type: object
      properties: {}
      examples:
        - {}
    java.lang.String:
      title: String
      type: string
      examples:
        - '"string"'
  messages:
    java.lang.String:
      headers:
        $ref: "#/components/schemas/SpringRabbitListenerDefaultHeaders"
      payload:
        schemaFormat: application/vnd.aai.asyncapi+json;version=3.0.0
        schema:
          $ref: "#/components/schemas/java.lang.String"
      name: java.lang.String
      title: String
      bindings:
        amqp:
          bindingVersion: 0.3.0
operations:
  queue-update-id_#_CRUD-topic-exchange-1-id_receive_bindingsUpdate:
    action: receive
    channel:
      $ref: "#/channels/queue-update-id_#_CRUD-topic-exchange-1-id"
    bindings:
      amqp:
        expiration: 0
        bindingVersion: 0.3.0
    messages:
      - $ref: "#/channels/queue-update-id_#_CRUD-topic-exchange-1-id/messages/java.lang.String"

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welcome to AsyncAPI. Thanks a lot for creating your first pull request. Please check out our contributors guide useful for opening a pull request.
Keep in mind there are also other channels you can use to interact with AsyncAPI community. For more details check out this issue.

@timonback timonback changed the title feat: Add amqp exchange routing pattern and ref in channel operation feat: add amqp exchange routing pattern and ref in channel operation Aug 27, 2024
@Pakisan
Copy link
Member

Pakisan commented Aug 28, 2024

@timonback this repo is destination point

Start from creating PR in schemas repo - https://github.com/asyncapi/spec-json-schemas/tree/master/bindings/amqp

Create new version 0.4.0 and change schema

@timonback
Copy link
Author

Thank you @Pakisan for the pointing out the correct repo. I re-created this PR as asyncapi/spec-json-schemas#555

@Pakisan
Copy link
Member

Pakisan commented Oct 9, 2024

@jonaslagoni @dalelane @smoya @GreenRover please, take a look

this PR is ready asyncapi/spec-json-schemas#555

@Pakisan
Copy link
Member

Pakisan commented Oct 16, 2024

@GreenRover
Copy link
Collaborator

Sorry my knowledge about amqp is very low, i am not sure if i can help.

Copy link
Member

@derberg derberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late review @timonback
I love the work you're doing with Springwolf!

I have some general comments except for the inline ones I left in a PR:

  • would be good if you could update the example from PR description with the new binding (adding some inline comment about the new addition) to visualize the added change
  • since new fields are added to the binding, it also means you need to update the version value in the README.md - same to do in json schema
  • for the sake of consistency, please have a look on how exchange object is defined. First the Map is described and then specific properties, like name and type. Please follow the same approach with new channel option. Although in your case it is even easier as channel will be of type Reference Object like in https://github.com/asyncapi/spec/blob/master/spec/asyncapi.md#operation-reply-object
  • make it clear in description that we are talking about AsyncAPI channel only, and $ref only. Have a look at how we do it in case of channel reference in reply object.

also cc @jonaslagoni that did a migration of the binding to v3, maybe he also have something to add

amqp/README.md Outdated
@@ -54,6 +56,9 @@ channels:
bindings:
amqp:
is: routingKey
name: routing.pattern
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically shouldn't there be something matching this name in the userSignup to make this example make sense?

@jonaslagoni
Copy link
Member

@derberg did not go into specifics of whether something made sense for AMQP, so I will stay out of the specifics :) Already reviewed asyncapi/spec-json-schemas#555

@timonback
Copy link
Author

Thank you for the review @derberg!

I have updated the PR to:

  • update the description, including link to reference object
  • differentiate between exchange and queue by channel id
  • update the version

Example:
Producer sends a message to the exchange address signup with the routingKey user.signup.
Because a TopicExchange is used, the routingKey is identical to the queue address and the message gets routed to the user.signup queue.
Consumer will get message from user.signup queue.


This proposal adds a 1:1 mapping between exchange and queue to indicate and thereby implementing the simple TopicExchange case.

Based on the amqp implemention in Spring Boot, it is possible that 1 exchange routes to multiple different queues using different routing keys (i.e. topic exchange user.signup, company.signup)
The current implementation in Springwolf will an exchange channel per binding/routing key (by using different channels ids). I wonder whether this exchange duplication is ideal?

Similarly, an exchange may have multiple amqp bindings. For example, an exchange can be of type=topic for one routingKey and of type=fanout for a different one. Creating multiple amqp (exchange) channel bindings is probably not intended and duplication the best work-around (for now).

@derberg
Copy link
Member

derberg commented Nov 4, 2024

@timonback not sure I get it, but if you say that an exchange can have routingKey pointing to multiple channels, why not making channel actually channels and an array of $ref?

look at this example from the spec: https://www.asyncapi.com/docs/reference/specification/v3.0.0#channelObject where you can specify that channel is available only on specific servers, notice that one can specify servers array of reference objects

@pdalfarr
Copy link

pdalfarr commented Nov 9, 2024

Hi everyone,

I have be asked by @timonback to "give some clarity to #259 and how exchanges and queues are connected in amqp? (is it a n:m mapping?)"

I am not really an expert, but I'll try ;-)
Here is an answer from Copilot, answer that I changed a bit.
I am not 100% sure this correct, but it the explanation bellow makes sense to me, so I am reasonably confident.

"
In RabbitMQ, exchanges and queues are connected through bindings.
Here’s a brief overview of how it works:

  • Exchanges: These are message routing agents that receive messages from producers and route them to queues based on certain criteria.
  • Queues: These store messages until they are consumed by a consumer.
  • Bindings: These are the links between an exchange and a queue. A binding can include a routing key or pattern that the exchange uses to determine how to route messages to the queue.

Regarding the n:m mapping:

Yes, it is possible to have an n:m mapping between exchanges and queues.
This means that multiple exchanges can route messages to multiple queues, and vice versa. This is achieved through multiple bindings.
For example, you can bind multiple queues to a single exchange, and you can also bind a single queue to multiple exchanges12.
"

This answer is 'in line' with my current understanding of how exchanges, bindings and queues are linked together.

I also add a copy of a comment I posted on SpringWolf's Github about that:

"
Just adding a link and a picture to illustrate the different kind of exchanges supported by RabbitMQ:

284951027-8d12b168-9961-45d0-871d-193dfcee3a56

src: https://hevodata.com/learn/rabbitmq-exchange-type/

So the 'complete chain' is:

Producer > Channel > Exchange > Binding > Routing Key > Queue > Consumer
"

So, theoretically, I think that we could have:

  • 2 exchanges, Exchange_1 and Exchange_2 ( each exchange type can be either "Topic" or "Direct"),
  • a Binding_1 with exchange = Exchange_1 and routing key = Queue_1
  • a Binding_2 with exchange = Exchange_2 and routing key = Queue_1 (same routing key as binding 1)
  • and 1 Queue, Queue_1.

I don't know if it's good practice or not, but I suppose it's a valid use case.
Visually, this would be :

image

@timonback
Copy link
Author

Thank you @pdalfarr for this explanation and pictures.

I'll attempt to summarize this GH issue

  1. AsyncAPI uses the concept of channels to describe an entity that can receive/send messages.
  2. In AsyncAPI operations are used to describe individual messages that is either received/send on one channel.
  3. Amqp has the concept of an exchange (channel in AsyncAPI), where an application send message to.
  4. Amqp has the concept of a queue (also channel in AsyncAPI), where an application can receive a message from.
  5. Amqp exchanges and queues allow for a m:n mapping, also in multiple layers (exchange1 -> exchange2 -> queue1 + queue2)
  6. AsyncAPI operation cannot be used to describe this broker routing, as associates 1 channel + receive/send with messages.

For details, check @pdalfarr comment above: #259 (comment)

Original issue

It would be great to have a way to describe, how messages are routed on the broker

Initial idea/improvement

Allow to link a channel (exchange) to another (1) channel (queue/exchange).

Problem:

  • amqp is actually an n:m mapping
  • Only one channel binding is supported for amqp, while one exchange (channel) may have multiple targets and therefore routingKeys

Also, how to translate amqp bindings (basically the m:n mapping) to AsyncAPI?

Alternative

Rather use the operation object to bind a channel (exchange) to a amqp queue.
The target queue + routingKey could be part of the amqp operation binding.


What are your thoughts?
Does this fit into the current AsyncAPI version?

(As I am not familiar enough with amqp, I cannot and do not plan a re-design of the amqp binding. If it turns out, that the use case is a non-goal or design limitation, that is fine with me.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants