Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

closes: #47349

Ideally, it isn't correct to pass executor_config with pod_override for CE as it makes no sense to do that. It is generally a config for KE / EDGE etc. When executor_config is passed to an executor, with AF 3, it is present as part of the TI model & is serialised as a workload and sent to the supervisor to run. We do not need to ever pass it down to the supervise function as it these are "decorators" used to set the "environment" for execution for the task. Example: they are used to pick right image, volumes etc for KE.

Ignoring the config for executor_config while performing a serialisation.

Testing:

  • Used the same DAG as earlier:
from airflow.models import DAG
from airflow.providers.standard.operators.python import PythonVirtualenvOperator
from pendulum import today
from kubernetes.client import models as k8s

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(10):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(10)
    print("Finished")


with DAG(
    dag_id="virtualenv_python_operator",
    default_args={"owner": "airflow"},
    schedule=None,
    start_date=today('UTC').add(days=-2),
    tags=["core"],
) as dag:

    task = PythonVirtualenvOperator(
        task_id="virtualenv_python",
        python_callable=callable_virtualenv,
        requirements=["colorama==0.4.0"],
        system_site_packages=False,
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            resources=k8s.V1ResourceRequirements(
                                requests={
                                    "cpu": "100m",
                                    "memory": "384Mi",
                                },
                                limits={
                                    "cpu": 1,
                                    "memory": "500Mi",
                                }
                            )
                        )
                    ]
                )
            )
        }
    )
image

Logs:

::group::Log message source details sources=["/root/airflow/logs/dag_id=virtualenv_python_operator/run_id=manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt/task_id=virtualenv_python/attempt=1.log"]

::endgroup::

[2025-03-05T07:31:44.480947Z] DEBUG - Loading plugins logger="airflow.plugins_manager"

[2025-03-05T07:31:44.481644Z] DEBUG - Loading plugins from directory: /files/plugins logger="airflow.plugins_manager"

[2025-03-05T07:31:44.481732Z] DEBUG - Note: Loading plugins from examples as well: /files/plugins logger="airflow.plugins_manager"

[2025-03-05T07:31:44.485294Z] WARNING - /files/plugins/my_xcom.py:13: SAWarning: This declarative base already contains a class with the same class name and module name as my_xcom.JSONFileXComBackend, and will be replaced in the string-lookup table. class JSONFileXComBackend(BaseXCom): logger="py.warnings"

[2025-03-05T07:31:44.487014Z] ERROR - Failed to import plugin /files/plugins/my_plugin.py logger="airflow.plugins_manager" error_detail=[{"exc_type":"ModuleNotFoundError","exc_value":"No module named 'airflow.sdk.definitions.baseoperatorlink'","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/airflow/plugins_manager.py","lineno":290,"name":"load_plugins_from_plugin_directory"},{"filename":"<frozen importlib._bootstrap_external>","lineno":850,"name":"exec_module"},{"filename":"<frozen importlib._bootstrap>","lineno":228,"name":"_call_with_frames_removed"},{"filename":"/files/plugins/my_plugin.py","lineno":2,"name":"<module>"}]}]

[2025-03-05T07:31:45.017015Z] DEBUG - Loading plugins from entrypoints logger="airflow.plugins_manager"

[2025-03-05T07:31:45.017504Z] DEBUG - Importing entry_point plugin hive logger="airflow.plugins_manager"

[2025-03-05T07:31:45.024081Z] DEBUG - Importing entry_point plugin databricks_workflow logger="airflow.plugins_manager"

[2025-03-05T07:31:45.039245Z] DEBUG - Importing entry_point plugin edge_executor logger="airflow.plugins_manager"

[2025-03-05T07:31:45.045887Z] DEBUG - Importing entry_point plugin openlineage logger="airflow.plugins_manager"

[2025-03-05T07:31:45.120448Z] DEBUG - Importing entry_point plugin databricks_workflow logger="airflow.plugins_manager"

[2025-03-05T07:31:45.121064Z] DEBUG - Importing entry_point plugin hive logger="airflow.plugins_manager"

[2025-03-05T07:31:45.121515Z] DEBUG - Importing entry_point plugin edge_executor logger="airflow.plugins_manager"

[2025-03-05T07:31:45.121877Z] DEBUG - Importing entry_point plugin openlineage logger="airflow.plugins_manager"

[2025-03-05T07:31:45.122289Z] DEBUG - Loading 7 plugin(s) took 641.19 seconds logger="airflow.plugins_manager"

[2025-03-05T07:31:45.122583Z] DEBUG - Calling 'on_starting' with {'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffff877ea6d0>} logger="airflow.listeners.listener"

[2025-03-05T07:31:45.122827Z] DEBUG - Hook impls: [] logger="airflow.listeners.listener"

[2025-03-05T07:31:45.122926Z] DEBUG - Result from 'on_starting': [] logger="airflow.listeners.listener"

[2025-03-05T07:31:45.124100Z] INFO - DAG bundles loaded: dags-folder, example_dags logger="airflow.dag_processing.bundles.manager.DagBundlesManager"

[2025-03-05T07:31:45.124317Z] INFO - Filling up the DagBag from /files/dags/dags/bug-ce.py logger="airflow.models.dagbag.DagBag"

[2025-03-05T07:31:45.124581Z] DEBUG - Importing /files/dags/dags/bug-ce.py logger="airflow.models.dagbag.DagBag"

[2025-03-05T07:31:45.129295Z] DEBUG - Loaded DAG <DAG: virtualenv_python_operator> logger="airflow.models.dagbag.DagBag"

[2025-03-05T07:31:45.129575Z] DEBUG - DAG file parsed file="dags/bug-ce.py" logger="task"

[2025-03-05T07:31:45.187530Z] DEBUG - Sending request json="{\"rendered_fields\":{\"op_args\":\"()\",\"templates_dict\":null,\"requirements\":[\"colorama==0.4.0\"],\"index_urls\":null,\"venv_cache_path\":null,\"op_kwargs\":{}},\"type\":\"SetRenderedFields\"}\n" logger="task"

[2025-03-05T07:31:45.187852Z] DEBUG - Calling 'on_task_instance_running' with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance': RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'), task_id='virtualenv_python', dag_id='virtualenv_python_operator', run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1, map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator): virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7, 31, 44, 441399, tzinfo=TzInfo(UTC)))} logger="airflow.listeners.listener"

[2025-03-05T07:31:45.187906Z] DEBUG - Hook impls: [<HookImpl plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 'airflow.example_dags.plugins.event_listener' from '/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] logger="airflow.listeners.listener"

[2025-03-05T07:31:45.188037Z] DEBUG - Result from 'on_task_instance_running': [] logger="airflow.listeners.listener"

[2025-03-05T07:31:45.197909Z] WARNING - PythonVirtualenvOperator.execute cannot be called outside TaskInstance! logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"

[2025-03-05T07:31:45.197981Z] WARNING - PythonVirtualenvOperator.execute cannot be called outside TaskInstance! logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"

[2025-03-05T07:31:45.198578Z] INFO - Executing cmd: uv venv --allow-existing --seed --python python /tmp/venv45hz_3xa logger="airflow.utils.process_utils"

[2025-03-05T07:31:45.206278Z] INFO - Output: logger="airflow.utils.process_utils"

[2025-03-05T07:31:45.206743Z] INFO - Task instance is in running state chan="stdout" logger="task"

[2025-03-05T07:31:45.206764Z] INFO - Previous state of the Task instance: queued chan="stdout" logger="task"

[2025-03-05T07:31:45.206777Z] INFO - Current task name:virtualenv_python chan="stdout" logger="task"

[2025-03-05T07:31:45.206793Z] INFO - Dag name:virtualenv_python_operator chan="stdout" logger="task"

[2025-03-05T07:31:45.248073Z] INFO - Using CPython 3.9.21 interpreter at: /usr/local/bin/python logger="airflow.utils.process_utils"

[2025-03-05T07:31:45.248171Z] INFO - Creating virtual environment with seed packages at: /tmp/venv45hz_3xa logger="airflow.utils.process_utils"

[2025-03-05T07:31:46.931764Z] INFO - + pip==25.0.1 logger="airflow.utils.process_utils"

[2025-03-05T07:31:46.931899Z] INFO - + setuptools==75.8.2 logger="airflow.utils.process_utils"

[2025-03-05T07:31:46.931932Z] INFO - + wheel==0.45.1 logger="airflow.utils.process_utils"

[2025-03-05T07:31:46.931995Z] INFO - Activate with: source /tmp/venv45hz_3xa/bin/activate logger="airflow.utils.process_utils"

[2025-03-05T07:31:46.934027Z] INFO - Executing cmd: uv pip install --python /tmp/venv45hz_3xa/bin/python -r /tmp/venv45hz_3xa/requirements.txt logger="airflow.utils.process_utils"

[2025-03-05T07:31:46.941631Z] INFO - Output: logger="airflow.utils.process_utils"

[2025-03-05T07:31:46.993076Z] INFO - Using Python 3.9.21 environment at: /tmp/venv45hz_3xa logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.166176Z] INFO - Resolved 1 package in 171ms logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.189297Z] INFO - Prepared 1 package in 22ms logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.193621Z] INFO - Installed 1 package in 4ms logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.193748Z] INFO - + colorama==0.4.0 logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.205122Z] INFO - Executing cmd: /tmp/venv45hz_3xa/bin/python /tmp/venv-callxpo2rnmx/script.py /tmp/venv-callxpo2rnmx/script.in /tmp/venv-callxpo2rnmx/script.out /tmp/venv-callxpo2rnmx/string_args.txt /tmp/venv-callxpo2rnmx/termination.log /tmp/venv-callxpo2rnmx/airflow_context.json logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.214445Z] INFO - Output: logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.260343Z] INFO - �[31msome red text logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.260438Z] INFO - �[42mand with a green background logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.260467Z] INFO - �[2mand in dim text logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.260488Z] INFO - �[0m logger="airflow.utils.process_utils"

[2025-03-05T07:31:47.260518Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:31:57.274693Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:32:07.283704Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:32:17.298063Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:32:27.304833Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:32:37.325562Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:32:47.346042Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:32:57.365319Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:33:07.376676Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:33:17.396723Z] INFO - �[2mPlease wait... logger="airflow.utils.process_utils"

[2025-03-05T07:33:58.405640Z] INFO - Finished logger="airflow.utils.process_utils"

[2025-03-05T07:33:58.459264Z] INFO - Done. Returned value was: None logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"

[2025-03-05T07:33:58.459744Z] DEBUG - Sending request json="{\"state\":\"success\",\"end_date\":\"2025-03-05T07:33:58.459592Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n" logger="task"

[2025-03-05T07:33:58.459865Z] DEBUG - Running finalizers ti="RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'), task_id='virtualenv_python', dag_id='virtualenv_python_operator', run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1, map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator): virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7, 31, 44, 441399, tzinfo=TzInfo(UTC)))" logger="task"

[2025-03-05T07:33:58.461605Z] DEBUG - Calling 'on_task_instance_success' with {'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance': RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'), task_id='virtualenv_python', dag_id='virtualenv_python_operator', run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1, map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator): virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7, 31, 44, 441399, tzinfo=TzInfo(UTC)))} logger="airflow.listeners.listener"

[2025-03-05T07:33:58.461687Z] DEBUG - Hook impls: [<HookImpl plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 'airflow.example_dags.plugins.event_listener' from '/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] logger="airflow.listeners.listener"

[2025-03-05T07:33:58.462171Z] DEBUG - Result from 'on_task_instance_success': [] logger="airflow.listeners.listener"

[2025-03-05T07:33:58.462226Z] DEBUG - Calling 'before_stopping' with {'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffff9f340fd0>} logger="airflow.listeners.listener"

[2025-03-05T07:33:58.462252Z] DEBUG - Hook impls: [] logger="airflow.listeners.listener"

[2025-03-05T07:33:58.462275Z] DEBUG - Result from 'before_stopping': [] logger="airflow.listeners.listener"

[2025-03-05T07:33:58.488373Z] INFO - Task instance in success state chan="stdout" logger="task"

[2025-03-05T07:33:58.488395Z] INFO - Previous state of the Task instance: running chan="stdout" logger="task"

[2025-03-05T07:33:58.488409Z] INFO - Task operator:<Task(PythonVirtualenvOperator): virtualenv_python> chan="stdout" logger="task"

[2025-03-05T07:33:58.490037Z] WARNING - Airflow core logging is not using a FileTaskHandler, can't upload logs to remote handler="<class 'NoneType'>" logger="task"

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh requested a review from eladkal March 5, 2025 08:20
@eladkal eladkal merged commit 73627fd into apache:main Mar 5, 2025
61 checks passed
shahar1 pushed a commit to shahar1/airflow that referenced this pull request Mar 5, 2025
apache#47375)

* Scheduler should not crash when executor_config is provided with CE

* removing bad test
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
apache#47375)

* Scheduler should not crash when executor_config is provided with CE

* removing bad test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scheduler crash while using PythonVirtualenvOperator

2 participants