Skip to content

dag processor crashes trying to process callback request for another bundle #57081

@tirkarthi

Description

@tirkarthi

Apache Airflow version

main (development)

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

Our setup has dag processor sharded to handle different folders. dag processor when started with a bundle name through -B has references only to the bundle. When it tries to process another callback from another bundle then it crashes while trying to fetch the log file name.

What you think should happen instead?

No response

How to reproduce

  1. Create dag_1.py in /home/karthikeyan/airflow/dags1
  2. Create dag_2.py in /home/karthikeyan/airflow/dags2
  3. Start two dag processor with bundle dags-folder-1 and dags-folder-2 as airflow dag-processor -vvv -B dags-folder-1 and airflow dag-processor -vvv -B dags-folder-2
  4. Let the dags appear in the UI.
  5. Stop dags-folder-1 and dags-folder-2 dag processor
  6. Trigger dag_1 from the UI and trigger the dag to send callback to db.
  7. Start the dags-folder-2 dag processor which fetches the dags_1 callback and crashes while trying to process it. self._dag_bundles when -B is passed has only references to the passed bundle
mysql> select * from callback;
+----------------------------+-----------------+---------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+------------+----------------------------------+
| created_at                 | priority_weight | type          | fetch_method  | data                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     | state | output | trigger_id | id                               |
+----------------------------+-----------------+---------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+------------+----------------------------------+
| 2025-10-22 21:42:18.735975 |               1 | dag_processor | dag_attribute | {"__var": {"req_data": "{\"filepath\":\"dag_1.py\",\"bundle_name\":\"dags-folder-1\",\"bundle_version\":null,\"msg\":null,\"ti\":{\"id\":\"019a0cb0-6cff-76dc-ae43-315ebcbd3de2\",\"task_id\":\"task1\",\"dag_id\":\"dag_1\",\"run_id\":\"manual__2025-10-22T16:11:19+00:00\",\"try_number\":1,\"dag_version_id\":\"019a0ca8-606b-7fdc-bf38-0a0e59af7c03\",\"map_index\":-1,\"hostname\":\"laptop\",\"context_carrier\":null},\"task_callback_type\":\"success\",\"context_from_server\":null,\"type\":\"TaskCallbackRequest\"}", "req_class": "TaskCallbackRequest"}, "__type": "dict"} | NULL  | NULL   |       NULL | 019a0cb151af773488c25c6afd5b57c2 |
+----------------------------+-----------------+---------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+------------+----------------------------------+
1 row in set (0.00 sec)
[2025-10-22T16:06:49.847784Z] {dag_processor_job_runner.py:63} ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 272, in run
    return self._run_parsing_loop()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 357, in _run_parsing_loop
    self._start_new_processes()
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 929, in _start_new_processes
    processor = self._create_process(file)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 908, in _create_process
    logger, logger_filehandle = self._get_logger_for_dag_file(dag_file)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 886, in _get_logger_for_dag_file
    log_filename = self._render_log_filename(dag_file)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 881, in _render_log_filename
    bundle = next(b for b in self._dag_bundles if b.name == dag_file.bundle_name)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
StopIteration
[2025-10-22T16:06:49.866068Z] {supervisor.py:712} INFO - Process exited pid=17796 exit_code=<Negsignal.SIGTERM: -15> signal_sent=SIGTERM
[2025-10-22T16:06:49.877181Z] {process_utils.py:285} INFO - Waiting up to 5 seconds for processes to exit...
[2025-10-22T16:06:49.885516Z] {listener.py:37} DEBUG - Calling 'before_stopping' with {'component': <airflow.jobs.job.Job object at 0x7f6408975f50>}
[2025-10-22T16:06:49.885927Z] {listener.py:38} DEBUG - Hook impls: []
[2025-10-22T16:06:49.886071Z] {listener.py:42} DEBUG - Result from 'before_stopping': []
[2025-10-22T16:06:49.896775Z] {cli_action_loggers.py:98} DEBUG - Calling callbacks: []
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py", line 53, in dag_processor
    run_command_with_daemon_option(
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py", line 56, in <lambda>
    callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", line 367, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", line 396, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 272, in run
    return self._run_parsing_loop()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 357, in _run_parsing_loop
    self._start_new_processes()
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 929, in _start_new_processes
    processor = self._create_process(file)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 908, in _create_process
    logger, logger_filehandle = self._get_logger_for_dag_file(dag_file)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 886, in _get_logger_for_dag_file
    log_filename = self._render_log_filename(dag_file)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 881, in _render_log_filename
    bundle = next(b for b in self._dag_bundles if b.name == dag_file.bundle_name)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
StopIteration

trigger and operator

# cat ~/airflow/plugins/custom_trigger.py 

import asyncio

from airflow.sdk import BaseOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent, TaskSuccessEvent


class CustomTrigger(BaseTrigger):
    def __init__(self):
        super().__init__()

    def serialize(self):
        return (self.__class__.__module__ + "." + self.__class__.__name__, {})

    async def run(self):
        yield TaskSuccessEvent()


class CustomOperator(BaseOperator):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute(self, context, current_item_index=0, event=None):
        self.defer(
            trigger=CustomTrigger(),
            method_name="execute_complete",
        )

    def execute_complete(self, *args, **kwargs):
        pass
# cat ~/airflow/dags1/dag_1.py 

from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from custom_trigger import CustomOperator


with DAG(
    dag_id="dag_1",
    schedule="@continuous",
    max_active_runs=1,
    catchup=False
):
    task1 = CustomOperator(task_id="task1"
# cat ~/airflow/dags2/dag_2.py 

from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from custom_trigger import CustomOperator


with DAG(
    dag_id="dag_2",
    schedule="@continuous",
    max_active_runs=1,
    catchup=False
):
    task1 = CustomOperator(task_id="task1")

airflow.cfg

[dag_processor]
dag_bundle_config_list = [
    {
      "name": "dags-folder-1",
      "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
      "kwargs": {"path": "/home/karthikeyan/airflow/dags1"}
    },
    {
      "name": "dags-folder-2",
      "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
      "kwargs": {"path": "/home/karthikeyan/airflow/dags2"}
    }
   ]

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions