Skip to content

Conversation

@nashjain
Copy link

@nashjain nashjain commented Sep 9, 2025

  • Added support for AsyncAPI v3
  • Also cleaned up the spec to make it very clear that step-object can be oneOf openapi-step-object or asyncapi-step-object or workflow-step-object
  • For AsyncAPI we really need support for timeout, fork and join. However, these are also useful for OpenAPI so added it at the base step object.
  • For OpenAPI we need at least one successCriteria but for AsyncAPI it can be optional.

While we've tried to incorporate as much as possible from #270 not everything is covered.

Also cleaned up the spec to make it very clear that step-object can be oneOf openapi-step-object or asyncapi-step-object or workflow-step-object
For AsyncAPI we really need support for timeout, fork and join. However, these are also useful for OpenAPI so added it at the base step object.
For OpenAPI we need at least one successCriteria but for AsyncAPI it can be optional.
@kevinduffey
Copy link
Collaborator

Let's have you join Nick, Mike and myself next week and the next few weeks for some follow up on how this works. We started meeting to discuss the best approach forward as well. As this would fundamentally alter the spec we'll definitely need a bit of time to go through what you have here and what we are looking in to.. see what might overlap, etc. Shoot me a msg on slack with your email so I can add you to our conversations there.

@nashjain
Copy link
Author

Here is a simple example just for your reference.

arazzo: "1.0.1"
info:
  title: "Workflow for placing an Order"
  version: "1.0.0"
sourceDescriptions:
- name: "OrderApi"
  url: "./openapi/order.yaml"
  type: "openapi"
- name: "AsyncOrderApi"
  url: "./asyncapi/order.yaml"
  type: "asyncapi"
workflows:
- workflowId: "PlaceOrder"
  inputs:
    required:
    - "CreateOrder"
    type: "object"
    properties:
      CreateOrder:
        required:
        - "orderRequestId"
        - "productId"
        - "quantity"
        type: "object"
        properties:
          orderRequestId:
            type: "string"
          productId:
            type: "integer"
          quantity:
            type: "integer"
  steps:
  - stepId: "CreateOrder"
    operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
    stepType: "asyncapi"
    action: "send"
    parameters:
    - name: "orderRequestId"
      in: "header"
      value: "$inputs.CreateOrder.orderRequestId"
    requestBody:
      payload:
        productId: "$inputs.CreateOrder.productId"
        quantity: "$inputs.CreateOrder.quantity"
    outputs:
      orderRequestId: "$message.header.orderRequestId"
  - stepId: "ConfirmOrder"
    operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
    stepType: "asyncapi"
    action: "receive"
    correlationId: "$steps.CreateOrder.outputs.orderRequestId"
    timeout: 6000
    outputs:
      orderId: "$message.body.orderId"
  - stepId: "GetOrderDetails"
    operationId: "$sourceDescriptions.OrderApi.getOrder"
    parameters:
    - name: "orderId"
      in: "path"
      value: "$steps.ConfirmOrder.outputs.orderId"
    successCriteria:
    - condition: "$statusCode == 200"
components: {}

@nashjain
Copy link
Author

As discussed, here is an example with fork and join

arazzo: "1.0.1"
info:
  title: "Workflow for placing an Order"
  version: "1.0.0"
sourceDescriptions:
- name: "OrderApi"
  url: "./openapi/order.yaml"
  type: "openapi"
- name: "AsyncOrderApi"
  url: "./asyncapi/order.yaml"
  type: "asyncapi"
workflows:
- workflowId: "PlaceOrder"
  inputs:
    required:
    - "CreateOrder"
    type: "object"
    properties:
      CreateOrder:
        required:
        - "orderRequestId"
        - "productId"
        - "quantity"
        type: "object"
        properties:
          orderRequestId:
            type: "string"
          productId:
            type: "integer"
          quantity:
            type: "integer"
  steps:
  - stepType: "asyncapi"
    stepId: "ConfirmOrder"
    operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
    action: "receive"
    correlationId: "$inputs.CreateOrder.orderRequestId"
    fork: true # Converts ConfirmOrder to a Non Blocking Step
    timeout: 6000
    outputs:
      orderId: "$message.body.orderId"
  - stepType: "asyncapi"
    stepId: "CreateOrder"
    operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
    action: "send"
    parameters:
    - name: "orderRequestId"
      in: "header"
      value: "$inputs.CreateOrder.orderRequestId"
    requestBody:
      payload:
        productId: "$inputs.CreateOrder.productId"
        quantity: "$inputs.CreateOrder.quantity"
  - stepId: "GetOrderDetails"
    operationId: "$sourceDescriptions.OrderApi.getOrder"
    join: # Waits for ConfirmOrder to complete or timeout
     - ConfirmOrder # Can also be 'true' to join/wait all
    parameters:
    - name: "orderId"
      in: "path"
      value: "$steps.ConfirmOrder.outputs.orderId"
    successCriteria:
    - condition: "$statusCode == 200"
components: {}

@frankkilcommins
Copy link
Collaborator

Thanks @nashjain - I will try to propose changes to the specification based on the examples later this week. There will no doubt be a few finer grained points that we'll need to discuss

@frankkilcommins
Copy link
Collaborator

frankkilcommins commented Oct 29, 2025

Taking the above examples into consideration here's a proposal outlining the specification changes to support AsyncAPI in v1.1.0. I've also provided an updated example which complies to the this proposal below. If we're in general agreement, I'll update this PR to reflect the proposal.

Summary of the specification changes to add AsyncAPI support (not exhaustive)

Source Description Object

type

Updated Allowed Values:

  • openapi
  • arazzo
  • asyncapi (new)

Description/Rationale:
The addition of asyncapi enables referencing AsyncAPI 3.0.0 definitions and allows workflows to model message-driven systems in addition to traditional request/response patterns.

Runtime Expressions

$message

Type: object
Available In: Steps with kind: asyncapi and action: receive
Description:
Provides access to the body and headers of a message received via an AsyncAPI-defined channel. This runtime expression enables event-driven workflows to assert success or extract outputs from message data.

Example:

successCriteria:
  - condition: $message.payload != null
  - condition: $message.header.correlationId == 'abc123'

$elapsedTime (optional - nice to have)

Type: integer
Available In: After a step completes
Description:
Represents the total time (in milliseconds) that a step took to execute. Can be used in successCriteria and onFailure.criteria to enforce nuanced timeout behaviour or performance related onFailure / onSuccess behaviour.

Example:

successCriteria:
  - condition: $elapsedTime < 5000

Step Object

kind

Type: string
Allowed Values: openapi, asyncapi, workflow
Description:
Indicates the type of step. Required when the document contains multiple sourceDescriptions of different types. Enables correct interpretation of fields like operationId, action, etc. We'll clearly explain to implementors how to infer this if omitted and what defaults should be. The generally thought process is that this is good moving forward but we can't make mandatory in minor release. It should be present for those looking to express async types of steps

action

Type: string
Allowed Values: send, receive
Required If: kind is asyncapi
Description: |
Indicates whether the step is sending (publishing) or receiving (subscribing to) a message on a channel described in an AsyncAPI document.

timeout

Type: integer
Format: milliseconds
** Description:** |
Defines the maximum allowed execution time for a step in milliseconds. If the step does not complete within the specified duration, it is considered a failure. The default behavior upon timeout is to terminate the workflow equivalent to using an end failure action.

Example:

timeout: 5000

Timeout behavior can be overridden by defining an onFailure block with criteria based on $elapsedTime. This would allow retry behaviour etc. if needed.

Example:

onFailure:
  - name: retryTimeout
    type: retry
    retryLimit: 3
    retryAfter: 1000
    criteria:
      - condition: $elapsedTime >= 5000

correlationId

Type: string (value runtime expression or literal)
Description:
Used in asyncapi steps to associate messages across send/receive operations. Typically references an ID passed in the message header or payload to correlate requests and responses.

Example:

correlationId: $inputs.CreateOrder.orderRequestId

dependsOn

Type: string array
Description:
Specifies a list of step identifiers that must complete (or be waited for) before the current step can begin execution. This enables modelling of explicit execution dependencies within a workflow. Note about forking: we leaning towards not having an explicit fork property in asyncapi kind steps. Instead we can assume that any type of such step with action: receive is by default non-blocking (or asynchronous) in nature. Other steps can leverage dependsOn to ensure the joining type of behaviour.

Example:

dependsOn:
  - $steps.ConfirmOrder

Step Execution Semantics

A step is considered successful only when all successCriteria are satisfied. If any condition fails, the step is deemed to have failed, and onFailure logic (if defined) is evaluated and executed.

There is no dedicated timeout field.
Instead, timeout behavior must be expressed using $elapsedTime within the successCriteria.

Example:

successCriteria:
  - condition: $statusCode == 200
  - condition: $elapsedTime < 5000

onFailure:
  - name: handleTimeout
    type: end
    criteria:
      - condition: $elapsedTime >= 5000

Updated Example:

arazzo: 1.1.0
info:
  title: Workflow for placing an Order
  version: 1.0.0
sourceDescriptions:
  - name: OrderApi
    url: ./openapi/order.yaml
    type: openapi
  - name: AsyncOrderApi
    url: ./asyncapi/order.yaml
    type: asyncapi
workflows:
  - workflowId: PlaceOrder
    inputs:
      required:
        - CreateOrder
      type: object
      properties:
        CreateOrder:
          required:
            - orderRequestId
            - productId
            - quantity
          type: object
          properties:
            orderRequestId:
              type: string
            productId:
              type: integer
            quantity:
              type: integer
    steps:
      - kind: asyncapi
        stepId: ConfirmOrder
        operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
        action: receive # Non Blocking Step by default
        timeout: 6000
        correlationId: $inputs.CreateOrder.orderRequestId
        successCriteria:
          - condition: $message.payload != null
        outputs:
          orderId: $message.body.orderId

      - kind: asyncapi
        stepId: CreateOrder
        operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
        action: send
        parameters:
          - name: orderRequestId
            in: header
            value: $inputs.CreateOrder.orderRequestId
        requestBody:
          payload:
            productId: $inputs.CreateOrder.productId
            quantity: $inputs.CreateOrder.quantity

      - stepId: GetOrderDetails
        operationId: $sourceDescriptions.OrderApi.getOrder
        dependsOn:
          - $steps.ConfirmOrder
        parameters:
          - name: orderId
            in: path
            value: $steps.ConfirmOrder.outputs.orderId
        successCriteria:
          - condition: $statusCode == 200
components: {}

@nashjain
Copy link
Author

Thanks @frankkilcommins The proposal mostly looks good to me. I just need sometime to think through a couple of items. I'm currently in Australia. Next week, once I'm back we could jump on a call to discuss and close it.

@kevinduffey
Copy link
Collaborator

@nashjain We discussed a bit about the removal of fork: true and kind: async implicitly indicates a fork, so removing that and then using dependsOn instead of join since we have dependsOn in the spec already elsewhere. Also the nature of a "fire and forget" (dont need response) vs "fire and wait for a response" which basically assumes if a dependsOn isn't indicating a given async kind that has a correlation id (or maybe thats not even needed) that it would indicate a fire/forget scenario. What do you think? If you can join the next meeting we can discuss on that call that would be great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants