Skip to content

Airflow DAG trigger with Amazon SQS Asset watcher trigger not working properly #51213

@ajayganti3

Description

@ajayganti3

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon 9.8.0

Apache Airflow version

3.0.1

Operating System

Ubuntu 22.04.3 LTS

Deployment

Docker-Compose

Deployment details

I'm using the official airflow docker compose file to spin up the airflow 3.0.1

What happened

I have a DAG configured with an Amazon SQS trigger using Airflow 3.0.1 and the airflow/assets approach to auto-trigger DAGs upon message arrival in the queue. The DAG works as expected for a short period (approximately 30 minutes), triggering whenever a message is sent to the SQS queue.

However, after this initial period:

  • The DAG stops triggering despite new messages being available in the SQS queue.
  • Restarting Airflow using docker compose restart temporarily restores the triggering functionality.
  • After another ~30 minutes, the problem recurs.
  • In some cases, even after restarting, the DAG never triggers again.
  • The aws_default connection used for the SQS integration remains valid and operational (confirmed via manual testing).

What you think should happen instead

The SQS-triggered DAG should:

  • Continuously monitor the queue and trigger the DAG when a new message arrives.
  • Remain functional indefinitely without needing to restart Airflow.
  • Provide meaningful logs or error messages if/when it stops functioning.

How to reproduce

  1. Use Airflow 3.0.1 official airflow docker compose
  2. Deploy with Docker Compose and start the Airflow environment.
  3. Define a DAG with an SQS trigger using Airflow assets.
  4. Set up a valid aws_default connection with appropriate IAM permissions.
  5. Send messages to the SQS queue — observe that the DAG is triggered.
  6. Wait — after a while, send another message. (Or use a separate python script that sends messages to the SQS at regular intervals)
  7. Observe that the DAG is no longer triggered.
  8. Restart Airflow with docker compose restart, and observe that triggering works again briefly, then fails. (Sometimes even after the restart also it never gets triggered).
  9. Below is the airflow DAG that I'm using.
from airflow.sdk import Asset, AssetWatcher, dag, task
import os

# Define the SQS queue URL
# Replace my_account_id and my_queue_name
SQS_QUEUE = "https://sqs.us-east-1.amazonaws.com/<my_account_id>/<my_queue_name>"

# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(
    aws_conn_id="aws_default",
    queue=SQS_QUEUE,
    waiter_delay=10,  # delay in seconds between polls
)

# Define an asset that watches for messages on the queue
sqs_queue_asset = Asset(
    "sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)]
)


# Schedule the DAG to run when the asset is triggered
@dag(schedule=[sqs_queue_asset])
def event_driven_dag():
    @task
    def process_message(**context):
        # Extract the triggering asset events from the context
        triggering_asset_events = context["triggering_asset_events"]
        for event in triggering_asset_events[sqs_queue_asset]:
            # Get the message from the TriggerEvent payload
            print(
                f"Processing message: {event.extra["payload"]["message_batch"][0]["Body"]}"
            )

    process_message()


event_driven_dag()

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions