Skip to content

Triggerer crashing with ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' #55026

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Triggerer crashing with below trace


triggerer                                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 151, in _get_length
triggerer     return get_task_map_length(v, resolved_vals[k], upstream_map_indexes)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/lib64/python3.12/functools.py", line 912, in wrapper
triggerer     return dispatch(args[0].__class__)(*args, **kw)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/xcom_arg.py", line 591, in _
triggerer     return (upstream_map_indexes.get(task_id) or 1) * len(resolved_val)
triggerer                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/lazy_sequence.py", line 90, in __len__
triggerer     from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
triggerer ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
triggerer [2025-08-28T08:21:30.608+0000] {triggerer_job_runner.py:178} INFO - Waiting for triggers to clean up
triggerer 2025-08-28 08:21:30 [info     [] Process exited                 [supervisor] exit_code=<Negsignal.SIGINT: -2> pid=21 signal_sent=SIGINT
triggerer [2025-08-28T08:21:30.709+0000] {triggerer_job_runner.py:183} INFO - Exited trigger loop
triggerer Traceback (most recent call last):
triggerer   File "/usr/local/bin/airflow", line 10, in <module>
triggerer     sys.exit(main())
triggerer              ^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
triggerer     args.func(args)
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
triggerer     return func(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper
triggerer     return f(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
triggerer     return func(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 69, in triggerer
triggerer     run_command_with_daemon_option(
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
triggerer     callback()
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 72, in <lambda>
triggerer     callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
triggerer                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run
triggerer     run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
triggerer     return func(*args, session=session, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in run_job
triggerer     return execute_job(job, execute_callable=execute_callable)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in execute_job
triggerer     ret = execute_callable()
triggerer           ^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 172, in _execute
triggerer     self.trigger_runner.run()
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 528, in run
triggerer     self.load_triggers()
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
triggerer     return func(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 551, in load_triggers
triggerer     self.update_triggers(set(ids))
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 685, in update_triggers
triggerer     workload = create_workload(new_trigger_orm)
triggerer                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 626, in create_workload
triggerer     trigger = expand_start_trigger_args(trigger)
triggerer               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 616, in expand_start_trigger_args
triggerer     task.render_template_fields(context=context)
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/mappedoperator.py", line 806, in render_template_fields
triggerer     mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context)
triggerer                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/mappedoperator.py", line 699, in _expand_mapped_kwargs
triggerer     return self._get_specified_expand_input().resolve(context)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 213, in resolve
triggerer     all_lengths = self._get_map_lengths(sized_resolved, upstream_map_indexes)
triggerer                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 160, in _get_map_lengths
triggerer     k: res for k, v in self.value.items() if v is not None if (res := _get_length(k, v)) is not None
triggerer                                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 151, in _get_length
triggerer     return get_task_map_length(v, resolved_vals[k], upstream_map_indexes)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/lib64/python3.12/functools.py", line 912, in wrapper
triggerer     return dispatch(args[0].__class__)(*args, **kw)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/xcom_arg.py", line 591, in _
triggerer     return (upstream_map_indexes.get(task_id) or 1) * len(resolved_val)
triggerer                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/lazy_sequence.py", line 90, in __len__
triggerer     from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
triggerer ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
triggerer INFO:     Waiting for child process [19]
triggerer INFO:     Child process [19] died
triggerer INFO:     Waiting for child process [20]
triggerer INFO:     Child process [20] died
triggerer INFO:     Received SIGTERM, exiting.
triggerer INFO:     Terminated child process [22]
triggerer INFO:     Terminated child process [39]
triggerer INFO:     Waiting for child process [22]
triggerer INFO:     Waiting for child process [39]
triggerer INFO:     Stopping parent process [17]
stream closed EOF for radiant-voltage-1751/radiant-voltage-1751-triggerer-6977c7c48-n4wdh (triggerer)

What you think should happen instead?

No response

How to reproduce

Run below DAG in main

from datetime import datetime, timedelta
from time import sleep

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync

delays = [30, 60, 90]


@task
def get_delays():
    return delays


@task
def get_wakes(delay, **context):
    "Wake {delay} seconds after the task starts"
    ti: TaskInstance = context["ti"]
    return (ti.start_date + timedelta(seconds=delay)).isoformat()


with DAG(
    dag_id="datetime_mapped",
    start_date=datetime(1970, 1, 1),
    schedule=None,
    tags=["taskmap"] 
) as dag:

    wake_times = get_wakes.expand(delay=get_delays())

    DateTimeSensor.partial(task_id="expanded_datetime").expand(target_time=wake_times)
    TimeDeltaSensor.partial(task_id="expanded_timedelta").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    DateTimeSensorAsync.partial(task_id="expanded_datetime_async").expand(
        target_time=wake_times
    )
    TimeDeltaSensorAsync.partial(task_id="expanded_timedelta_async").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    TimeDeltaSensor(task_id="static_timedelta", delta=timedelta(seconds=90))
    DateTimeSensor(
        task_id="static_datetime",
        target_time="{{macros.datetime.now() + macros.timedelta(seconds=90)}}",
    )

    PythonOperator(task_id="op_sleep_90", python_callable=lambda: sleep(90))

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions