-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
standard
Versions of Apache Airflow Providers
apache-airflow-providers-standard==1.9.0
Apache Airflow version
3.1.0
Operating System
AlmaLinux 9.6
Deployment
Other Docker-based deployment
Deployment details
The deployment is done in docker swarm with mounts on glusterfs, although this shoudn't matter to the issue as it also happens in docker compose with local mounts. I couldn't test in other deployments.
What happened
When using the default dag bundle, that maps the old /dags folder, if you try to import from other module inside the callable function used in the ExternalPythonOperator and PythonVirtualenvOperator the task will fail with a ModuleNotFoundError: No module named '...'.
Note that this does not happens with the PythonOperator, my guess is that it only happens in them because they are run in a subprocess and the dags/ folder no longer is a part of PYTHONPATH, as per issue #53617.
What you think should happen instead
I understand that dag bundles may not have AF2 style dags structure, however, the default bundle does map to the the dags/ folder, so what I expect is that there is no regressions when migrating if we still are just using the dags/ folder with LocalDagBundle. This issue does not happen in airflow 2.10 (the minimum version for the standard provider)
On the specifics of why to do imports inside a function, the external operators often have packages not available on the standard environment. If you don't import them lazily, the dag parsing will fail with a message that some package is not present.
How to reproduce
I uploaded a small dag with a bunch of examples in https://github.com/gfreitash/airflow-standard-operators-import-bug.
The structure is
bug_test_dag/
├── __init__.py
├── lib/ # Module to be imported in subfolder
│ ├── __init__.py
│ └── helper.py
├── root_module.py # Module to be imported in the same folder
└── bug_test_dag.py # Main DAG definition
It must be placed inside the /opt/airflow/dags and requires a virtual environment at /opt/airflow/venvs/bug_test_dag/task_venv/.
I did not test all operators, but it is plausible that this affects more than just ExternalPythonOperator and PythonVirtualenvOperator
Anything else
For example:
def test_import_in_external_python_operator():
"""
This function runs in ExternalPythonOperator (subprocess).
The import fails in Airflow 3.x because the subprocess
doesn't have the dags folder path in PYTHONPATH.
"""
# Import inside the function
from bug_test_dag.lib.helper import get_submodule_message
message = get_submodule_message()
print(f"SUCCESS: {message}")
print("ExternalPythonOperator can import from DAG bundle")
return message
# This task will fail with a ModuleNotFoundError
external_python_task = ExternalPythonOperator(
task_id="test_external_python_operator",
python=f"/opt/airflow/venvs/{DAG_ID}/task_venv/bin/python",
python_callable=test_import_in_external_python_operator,
dag=dag,
)
Gives this result
[2025-10-17 14:14:42] INFO - DAG bundles loaded: dags-folder source=airflow.dag_processing.bundles.manager.DagBundlesManager loc=manager.py:179
[2025-10-17 14:14:42] INFO - Filling up the DagBag from /opt/airflow/dags/bug_test_dag/bug_test_dag.py source=airflow.models.dagbag.DagBag loc=dagbag.py:593
[2025-10-17 14:14:43] INFO - Executing cmd: /opt/airflow/venvs/bug_test_dag/task_venv/bin/python /tmp/venv-callucs8bo5q/script.py /tmp/venv-callucs8bo5q/script.in /tmp/venv-callucs8bo5q/script.out /tmp/venv-callucs8bo5q/string_args.txt /tmp/venv-callucs8bo5q/termination.log /tmp/venv-callucs8bo5q/airflow_context.json source=airflow.utils.process_utils loc=process_utils.py:188
[2025-10-17 14:14:43] INFO - Output: source=airflow.utils.process_utils loc=process_utils.py:192
[2025-10-17 14:14:50] INFO - Traceback (most recent call last): source=airflow.utils.process_utils loc=process_utils.py:196
[2025-10-17 14:14:50] INFO - File "/tmp/venv-callucs8bo5q/script.py", line 61, in <module> source=airflow.utils.process_utils loc=process_utils.py:196
[2025-10-17 14:14:50] INFO - res = test_import_in_external_python_operator(*arg_dict["args"], **arg_dict["kwargs"]) source=airflow.utils.process_utils loc=process_utils.py:196
[2025-10-17 14:14:50] INFO - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ source=airflow.utils.process_utils loc=process_utils.py:196
[2025-10-17 14:14:50] INFO - File "/tmp/venv-callucs8bo5q/script.py", line 30, in test_import_in_external_python_operator source=airflow.utils.process_utils loc=process_utils.py:196
[2025-10-17 14:14:50] INFO - from bug_test_dag.lib.helper import get_submodule_message source=airflow.utils.process_utils loc=process_utils.py:196
[2025-10-17 14:14:50] INFO - ModuleNotFoundError: No module named 'bug_test_dag' source=airflow.utils.process_utils loc=process_utils.py:196
[2025-10-17 14:14:50] ERROR - Task failed with exception source=task loc=task_runner.py:972
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct