Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Aug 28, 2025

Fixes #55026

PR #53071 introduced a fundamental architectural issue by making the Triggerer supervisor process load DAGs and perform template rendering.

Error:

triggerer                                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 151, in _get_length
triggerer     return get_task_map_length(v, resolved_vals[k], upstream_map_indexes)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/lib64/python3.12/functools.py", line 912, in wrapper
triggerer     return dispatch(args[0].__class__)(*args, **kw)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/xcom_arg.py", line 591, in _
triggerer     return (upstream_map_indexes.get(task_id) or 1) * len(resolved_val)
triggerer                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/lazy_sequence.py", line 90, in __len__
triggerer     from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
triggerer ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
triggerer [2025-08-28T08:21:30.608+0000] {triggerer_job_runner.py:178} INFO - Waiting for triggers to clean up
triggerer 2025-08-28 08:21:30 [info     [] Process exited                 [supervisor] exit_code=<Negsignal.SIGINT: -2> pid=21 signal_sent=SIGINT
triggerer [2025-08-28T08:21:30.709+0000] {triggerer_job_runner.py:183} INFO - Exited trigger loop
triggerer Traceback (most recent call last):
triggerer   File "/usr/local/bin/airflow", line 10, in <module>
triggerer     sys.exit(main())
triggerer              ^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
triggerer     args.func(args)
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
triggerer     return func(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper
triggerer     return f(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
triggerer     return func(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 69, in triggerer
triggerer     run_command_with_daemon_option(
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
triggerer     callback()
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 72, in <lambda>
triggerer     callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
triggerer                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run
triggerer     run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
triggerer     return func(*args, session=session, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in run_job
triggerer     return execute_job(job, execute_callable=execute_callable)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in execute_job
triggerer     ret = execute_callable()
triggerer           ^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 172, in _execute
triggerer     self.trigger_runner.run()
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 528, in run
triggerer     self.load_triggers()
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
triggerer     return func(*args, **kwargs)
triggerer            ^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 551, in load_triggers
triggerer     self.update_triggers(set(ids))
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 685, in update_triggers
triggerer     workload = create_workload(new_trigger_orm)
triggerer                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 626, in create_workload
triggerer     trigger = expand_start_trigger_args(trigger)
triggerer               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 616, in expand_start_trigger_args
triggerer     task.render_template_fields(context=context)
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/mappedoperator.py", line 806, in render_template_fields
triggerer     mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context)
triggerer                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/mappedoperator.py", line 699, in _expand_mapped_kwargs
triggerer     return self._get_specified_expand_input().resolve(context)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 213, in resolve
triggerer     all_lengths = self._get_map_lengths(sized_resolved, upstream_map_indexes)
triggerer                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 160, in _get_map_lengths
triggerer     k: res for k, v in self.value.items() if v is not None if (res := _get_length(k, v)) is not None
triggerer                                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py", line 151, in _get_length
triggerer     return get_task_map_length(v, resolved_vals[k], upstream_map_indexes)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/lib64/python3.12/functools.py", line 912, in wrapper
triggerer     return dispatch(args[0].__class__)(*args, **kw)
triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/xcom_arg.py", line 591, in _
triggerer     return (upstream_map_indexes.get(task_id) or 1) * len(resolved_val)
triggerer                                                       ^^^^^^^^^^^^^^^^^
triggerer   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/lazy_sequence.py", line 90, in __len__
triggerer     from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
triggerer ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
triggerer INFO:     Waiting for child process [19]
triggerer INFO:     Child process [19] died
triggerer INFO:     Waiting for child process [20]
triggerer INFO:     Child process [20] died
triggerer INFO:     Received SIGTERM, exiting.
triggerer INFO:     Terminated child process [22]
triggerer INFO:     Terminated child process [39]
triggerer INFO:     Waiting for child process [22]
triggerer INFO:     Waiting for child process [39]
triggerer INFO:     Stopping parent process [17]
stream closed EOF for radiant-voltage-1751/radiant-voltage-1751-triggerer-6977c7c48-n4wdh (triggerer)

To Reproduce, run the following dag:

from datetime import datetime, timedelta
from time import sleep

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync

delays = [30, 60, 90]


@task
def get_delays():
    return delays


@task
def get_wakes(delay, **context):
    "Wake {delay} seconds after the task starts"
    ti: TaskInstance = context["ti"]
    return (ti.start_date + timedelta(seconds=delay)).isoformat()


with DAG(
    dag_id="datetime_mapped",
    start_date=datetime(1970, 1, 1),
    schedule=None,
    tags=["taskmap"] 
) as dag:

    wake_times = get_wakes.expand(delay=get_delays())

    DateTimeSensor.partial(task_id="expanded_datetime").expand(target_time=wake_times)
    TimeDeltaSensor.partial(task_id="expanded_timedelta").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    DateTimeSensorAsync.partial(task_id="expanded_datetime_async").expand(
        target_time=wake_times
    )
    TimeDeltaSensorAsync.partial(task_id="expanded_timedelta_async").expand(
        delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
    )

    TimeDeltaSensor(task_id="static_timedelta", delta=timedelta(seconds=90))
    DateTimeSensor(
        task_id="static_datetime",
        target_time="{{macros.datetime.now() + macros.timedelta(seconds=90)}}",
    )

    PythonOperator(task_id="op_sleep_90", python_callable=lambda: sleep(90))

Issues Caused because of it:

  1. Import errors with mapped operators when rendering fields: ImportError: cannot import name 'SUPERVISOR_COMMS'
  2. Architectural violation: Triggerer trying to do task-level operations
  3. Masked by tests: All tests mocked DagBag, hiding the real serialization issues
  4. Docs: Exposed internal StartTriggerArgs in Task SDK documentation

Root Cause:
The Triggerer supervisor process cannot access: XCom data (requires API calls). This can work in Trigger subprocess but not the supervisor process.

When mapped operators try to expand trigger args using lazy sequences, they fail because the architectural assumptions are violated.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

This reverts commit 4362c25 from apache#53071.

The original change violated Airflow's architectural separation by making
the Triggerer supervisor process load DAGs and attempt template rendering.
This breaks with mapped operators that use lazy sequences requiring
SUPERVISOR_COMMS, which only exists in task execution context.

The Triggerer should operate purely on trigger database records and
pre-computed trigger kwargs, not attempt runtime DAG operations.

Fixes triggerer failures with mapped operators using XCom dependencies.
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is easy way to add a test to prevent breaking this in the future

@kaxil kaxil requested a review from uranusjr as a code owner August 28, 2025 20:07
@kaxil kaxil merged commit 5d1d956 into apache:main Aug 28, 2025
107 checks passed
@kaxil kaxil deleted the revert-53071 branch August 28, 2025 20:58
mangal-vairalkar pushed a commit to mangal-vairalkar/airflow that referenced this pull request Aug 30, 2025
…ache#55037)

This reverts commit 4362c25 from apache#53071.

The original change violated Airflow's architectural separation by making
the Triggerer supervisor process load DAGs and attempt template rendering.
This breaks with mapped operators that use lazy sequences requiring
SUPERVISOR_COMMS, which only exists in task execution context.

The Triggerer should operate purely on trigger database records and
pre-computed trigger kwargs, not attempt runtime DAG operations.

Fixes triggerer failures with mapped operators using XCom dependencies.
nothingmin pushed a commit to nothingmin/airflow that referenced this pull request Sep 2, 2025
…ache#55037)

This reverts commit 4362c25 from apache#53071.

The original change violated Airflow's architectural separation by making
the Triggerer supervisor process load DAGs and attempt template rendering.
This breaks with mapped operators that use lazy sequences requiring
SUPERVISOR_COMMS, which only exists in task execution context.

The Triggerer should operate purely on trigger database records and
pre-computed trigger kwargs, not attempt runtime DAG operations.

Fixes triggerer failures with mapped operators using XCom dependencies.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Triggerer crashing with ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'

5 participants