Skip to content

Triggerer crash when migrating Airflow 2 to 3 with async dagrun already in deferred state  #55713

@atul-astronomer

Description

@atul-astronomer

Apache Airflow version

3.1.0b2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/triggerer_command.py", line 69, in triggerer
    run_command_with_daemon_option(
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/triggerer_command.py", line 72, in <lambda>
    callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run
    run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 368, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 397, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 170, in _execute
    self.trigger_runner.run()
  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 526, in run
    self.load_triggers()
  File "/opt/airflow/airflow-core/src/airflow/traces/tracer.py", line 58, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 549, in load_triggers
    self.update_triggers(set(ids))
  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 653, in update_triggers
    ser_ti = workloads.TaskInstance.model_validate(
  File "/usr/python/lib/python3.10/site-packages/pydantic/main.py", line 705, in model_validate
    return cls.__pydantic_validator__.validate_python(
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskInstance
dag_version_id
  UUID input should be a string, bytes or UUID object [type=uuid_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.11/v/uuid_type
INFO:     Shutting down

Note: Works fine if the async dag was not in deferred state during migration.

What you think should happen instead?

No response

How to reproduce

  1. Start airflow with AF version 2.11.0. (breeze start-airflow --executor CeleryExecutor --backend postgres --use-airflow-version 2.11.0)
  2. Trigger the below dag
  3. Migrate to 3.1.0b2 (breeze start-airflow --executor CeleryExecutor --backend postgres)
  4. Notice the error in triggerer logs
from datetime import datetime
from airflow import DAG
from airflow.utils.dates import days_ago

from airflow.sensors.date_time import DateTimeSensorAsync


with (DAG("async_trigger_sleep", schedule_interval=None, start_date=days_ago(1),tags=["async_migration"])
      ) as dag:
    DateTimeSensorAsync(
        task_id="wait_for_start",
        target_time=datetime(2028, 12, 10),

    )

Works fine with the below dag:

from airflow import DAG
from pendulum import today
from airflow.sensors.date_time import DateTimeSensorAsync

with DAG(
    "async_trigger_sleep",
    schedule=None,
    start_date=today("UTC").add(days=-1),
    tags=["async_migration"],
) as dag:
    DateTimeSensorAsync(
        task_id="wait_for_start",
        # wait for 1 minute from the DAG execution date
        target_time="{{ (execution_date + macros.timedelta(minutes=1)).isoformat() }}",
    )

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions