From cd39453a2ceffb89697d3d2decbe8bee9869874c Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 1 Nov 2024 00:35:19 +0100 Subject: [PATCH] Switch PythonVirtualenvOperator to venv from virtualenv package The PythonVirtualenvOperator has been using virtualenv in order to support Python 2.7 and pre Python 3.5, however we do not need those any more - and we can use built-in venv instead. Fixes: #40420 --- Dockerfile | 2 +- INSTALL | 3 +- .../example_dags/example_branch_operator.py | 217 +++++++++--------- .../example_branch_operator_decorator.py | 155 ++++++------- .../example_dags/example_python_decorator.py | 108 +++++---- .../example_dags/example_python_operator.py | 116 +++++----- .../tutorial_taskflow_api_virtualenv.py | 111 +++++---- .../12_airflow_dependencies_and_extras.rst | 3 +- .../doc/images/output_prod-image_build.txt | 2 +- .../src/airflow_breeze/global_constants.py | 1 - docker_tests/test_prod_image.py | 1 - docs/apache-airflow/extra-packages-ref.rst | 2 - docs/docker-stack/build-arg-ref.rst | 1 - docs/docker-stack/changelog.rst | 6 + generated/provider_dependencies.json | 3 +- hatch_build.py | 6 +- newsfragments/43568.significant.rst | 1 + .../airflow/providers/standard/CHANGELOG.rst | 2 + .../providers/standard/operators/python.py | 14 -- .../airflow/providers/standard/provider.yaml | 1 - .../standard/utils/python_virtualenv.py | 13 +- .../tests/standard/operators/test_python.py | 12 - .../standard/utils/test_python_virtualenv.py | 13 +- pyproject.toml | 3 +- tests/decorators/test_python_virtualenv.py | 2 +- 25 files changed, 372 insertions(+), 426 deletions(-) create mode 100644 newsfragments/43568.significant.rst diff --git a/Dockerfile b/Dockerfile index ca0173de3b131..25ceb86307766 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,7 +36,7 @@ # much smaller. # # Use the same builder frontend version for everyone -ARG AIRFLOW_EXTRAS="aiobotocore,amazon,async,celery,cncf-kubernetes,common-io,docker,elasticsearch,fab,ftp,google,google-auth,graphviz,grpc,hashicorp,http,ldap,microsoft-azure,mysql,odbc,openlineage,pandas,postgres,redis,sendgrid,sftp,slack,snowflake,ssh,statsd,uv,virtualenv" +ARG AIRFLOW_EXTRAS="aiobotocore,amazon,async,celery,cncf-kubernetes,common-io,docker,elasticsearch,fab,ftp,google,google-auth,graphviz,grpc,hashicorp,http,ldap,microsoft-azure,mysql,odbc,openlineage,pandas,postgres,redis,sendgrid,sftp,slack,snowflake,ssh,statsd,uv" ARG ADDITIONAL_AIRFLOW_EXTRAS="" ARG ADDITIONAL_PYTHON_DEPS="" diff --git a/INSTALL b/INSTALL index 7520cb88aff37..edf738f5e114d 100644 --- a/INSTALL +++ b/INSTALL @@ -259,8 +259,7 @@ Those extras are available as regular core airflow extras - they install optiona # START CORE EXTRAS HERE aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, github-enterprise, google- -auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv, -virtualenv +auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv # END CORE EXTRAS HERE diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index db79ac0c211a0..35f7a9b548e56 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -29,143 +29,140 @@ import pendulum -from airflow.providers.standard.operators.python import is_venv_installed - -if is_venv_installed(): - from airflow.models.dag import DAG - from airflow.operators.empty import EmptyOperator - from airflow.providers.standard.operators.python import ( - BranchExternalPythonOperator, - BranchPythonOperator, - BranchPythonVirtualenvOperator, - ExternalPythonOperator, - PythonOperator, - PythonVirtualenvOperator, +from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.python import ( + BranchExternalPythonOperator, + BranchPythonOperator, + BranchPythonVirtualenvOperator, + ExternalPythonOperator, + PythonOperator, + PythonVirtualenvOperator, +) +from airflow.utils.edgemodifier import Label +from airflow.utils.trigger_rule import TriggerRule + +PATH_TO_PYTHON_BINARY = sys.executable + +with DAG( + dag_id="example_branch_operator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + schedule="@daily", + tags=["example", "example2"], + orientation="TB", +) as dag: + run_this_first = EmptyOperator( + task_id="run_this_first", ) - from airflow.utils.edgemodifier import Label - from airflow.utils.trigger_rule import TriggerRule - - PATH_TO_PYTHON_BINARY = sys.executable - - with DAG( - dag_id="example_branch_operator", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - schedule="@daily", - tags=["example", "example2"], - orientation="TB", - ) as dag: - run_this_first = EmptyOperator( - task_id="run_this_first", - ) - options = ["a", "b", "c", "d"] + options = ["a", "b", "c", "d"] + + # Example branching on standard Python tasks - # Example branching on standard Python tasks + # [START howto_operator_branch_python] + branching = BranchPythonOperator( + task_id="branching", + python_callable=lambda: f"branch_{random.choice(options)}", + ) + # [END howto_operator_branch_python] + run_this_first >> branching + + join = EmptyOperator( + task_id="join", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) - # [START howto_operator_branch_python] - branching = BranchPythonOperator( - task_id="branching", - python_callable=lambda: f"branch_{random.choice(options)}", + for option in options: + t = PythonOperator( + task_id=f"branch_{option}", + python_callable=lambda: print("Hello World"), ) - # [END howto_operator_branch_python] - run_this_first >> branching - join = EmptyOperator( - task_id="join", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + empty_follow = EmptyOperator( + task_id="follow_" + option, ) - for option in options: - t = PythonOperator( - task_id=f"branch_{option}", - python_callable=lambda: print("Hello World"), - ) + # Label is optional here, but it can help identify more complex branches + branching >> Label(option) >> t >> empty_follow >> join - empty_follow = EmptyOperator( - task_id="follow_" + option, - ) + # Example the same with external Python calls - # Label is optional here, but it can help identify more complex branches - branching >> Label(option) >> t >> empty_follow >> join + # [START howto_operator_branch_ext_py] + def branch_with_external_python(choices): + import random - # Example the same with external Python calls + return f"ext_py_{random.choice(choices)}" - # [START howto_operator_branch_ext_py] - def branch_with_external_python(choices): - import random + branching_ext_py = BranchExternalPythonOperator( + task_id="branching_ext_python", + python=PATH_TO_PYTHON_BINARY, + python_callable=branch_with_external_python, + op_args=[options], + ) + # [END howto_operator_branch_ext_py] + join >> branching_ext_py + + join_ext_py = EmptyOperator( + task_id="join_ext_python", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) - return f"ext_py_{random.choice(choices)}" + def hello_world_with_external_python(): + print("Hello World from external Python") - branching_ext_py = BranchExternalPythonOperator( - task_id="branching_ext_python", + for option in options: + t = ExternalPythonOperator( + task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY, - python_callable=branch_with_external_python, - op_args=[options], + python_callable=hello_world_with_external_python, ) - # [END howto_operator_branch_ext_py] - join >> branching_ext_py - join_ext_py = EmptyOperator( - task_id="join_ext_python", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, - ) + # Label is optional here, but it can help identify more complex branches + branching_ext_py >> Label(option) >> t >> join_ext_py - def hello_world_with_external_python(): - print("Hello World from external Python") + # Example the same with Python virtual environments - for option in options: - t = ExternalPythonOperator( - task_id=f"ext_py_{option}", - python=PATH_TO_PYTHON_BINARY, - python_callable=hello_world_with_external_python, - ) + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = Path(tempfile.gettempdir()) - # Label is optional here, but it can help identify more complex branches - branching_ext_py >> Label(option) >> t >> join_ext_py + def branch_with_venv(choices): + import random - # Example the same with Python virtual environments + import numpy as np - # [START howto_operator_branch_virtualenv] - # Note: Passing a caching dir allows to keep the virtual environment over multiple runs - # Run the example a second time and see that it re-uses it and is faster. - VENV_CACHE_PATH = Path(tempfile.gettempdir()) + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" - def branch_with_venv(choices): - import random + branching_venv = BranchPythonVirtualenvOperator( + task_id="branching_venv", + requirements=["numpy~=1.26.0"], + venv_cache_path=VENV_CACHE_PATH, + python_callable=branch_with_venv, + op_args=[options], + ) + # [END howto_operator_branch_virtualenv] + join_ext_py >> branching_venv + + join_venv = EmptyOperator( + task_id="join_venv", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) - import numpy as np + def hello_world_with_venv(): + import numpy as np - print(f"Some numpy stuff: {np.arange(6)}") - return f"venv_{random.choice(choices)}" + print(f"Hello World with some numpy stuff: {np.arange(6)}") - branching_venv = BranchPythonVirtualenvOperator( - task_id="branching_venv", + for option in options: + t = PythonVirtualenvOperator( + task_id=f"venv_{option}", requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH, - python_callable=branch_with_venv, - op_args=[options], - ) - # [END howto_operator_branch_virtualenv] - join_ext_py >> branching_venv - - join_venv = EmptyOperator( - task_id="join_venv", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + python_callable=hello_world_with_venv, ) - def hello_world_with_venv(): - import numpy as np - - print(f"Hello World with some numpy stuff: {np.arange(6)}") - - for option in options: - t = PythonVirtualenvOperator( - task_id=f"venv_{option}", - requirements=["numpy~=1.26.0"], - venv_cache_path=VENV_CACHE_PATH, - python_callable=hello_world_with_venv, - ) - - # Label is optional here, but it can help identify more complex branches - branching_venv >> Label(option) >> t >> join_venv + # Label is optional here, but it can help identify more complex branches + branching_venv >> Label(option) >> t >> join_venv diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py index 6777db38a9b6c..e9b3bea97a72f 100644 --- a/airflow/example_dags/example_branch_operator_decorator.py +++ b/airflow/example_dags/example_branch_operator_decorator.py @@ -30,121 +30,116 @@ import pendulum -from airflow.providers.standard.operators.python import is_venv_installed +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator +from airflow.utils.edgemodifier import Label +from airflow.utils.trigger_rule import TriggerRule -if is_venv_installed(): - from airflow.decorators import task - from airflow.models.dag import DAG - from airflow.operators.empty import EmptyOperator - from airflow.utils.edgemodifier import Label - from airflow.utils.trigger_rule import TriggerRule +PATH_TO_PYTHON_BINARY = sys.executable - PATH_TO_PYTHON_BINARY = sys.executable +with DAG( + dag_id="example_branch_python_operator_decorator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + schedule="@daily", + tags=["example", "example2"], + orientation="TB", +) as dag: + run_this_first = EmptyOperator(task_id="run_this_first") - with DAG( - dag_id="example_branch_python_operator_decorator", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - schedule="@daily", - tags=["example", "example2"], - orientation="TB", - ) as dag: - run_this_first = EmptyOperator(task_id="run_this_first") + options = ["a", "b", "c", "d"] - options = ["a", "b", "c", "d"] + # Example branching on standard Python tasks - # Example branching on standard Python tasks + # [START howto_operator_branch_python] + @task.branch() + def branching(choices: list[str]) -> str: + return f"branch_{random.choice(choices)}" - # [START howto_operator_branch_python] - @task.branch() - def branching(choices: list[str]) -> str: - return f"branch_{random.choice(choices)}" + # [END howto_operator_branch_python] - # [END howto_operator_branch_python] + random_choice_instance = branching(choices=options) - random_choice_instance = branching(choices=options) + run_this_first >> random_choice_instance - run_this_first >> random_choice_instance + join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + for option in options: - for option in options: + @task(task_id=f"branch_{option}") + def some_task(): + print("doing something in Python") - @task(task_id=f"branch_{option}") - def some_task(): - print("doing something in Python") + t = some_task() + empty = EmptyOperator(task_id=f"follow_{option}") - t = some_task() - empty = EmptyOperator(task_id=f"follow_{option}") + # Label is optional here, but it can help identify more complex branches + random_choice_instance >> Label(option) >> t >> empty >> join - # Label is optional here, but it can help identify more complex branches - random_choice_instance >> Label(option) >> t >> empty >> join + # Example the same with external Python calls - # Example the same with external Python calls + # [START howto_operator_branch_ext_py] + @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) + def branching_ext_python(choices) -> str: + import random - # [START howto_operator_branch_ext_py] - @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) - def branching_ext_python(choices) -> str: - import random + return f"ext_py_{random.choice(choices)}" - return f"ext_py_{random.choice(choices)}" + # [END howto_operator_branch_ext_py] - # [END howto_operator_branch_ext_py] + random_choice_ext_py = branching_ext_python(choices=options) - random_choice_ext_py = branching_ext_python(choices=options) + join >> random_choice_ext_py - join >> random_choice_ext_py + join_ext_py = EmptyOperator(task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - join_ext_py = EmptyOperator( - task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS - ) - - for option in options: + for option in options: - @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) - def some_ext_py_task(): - print("doing something in external Python") + @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) + def some_ext_py_task(): + print("doing something in external Python") - t = some_ext_py_task() + t = some_ext_py_task() - # Label is optional here, but it can help identify more complex branches - random_choice_ext_py >> Label(option) >> t >> join_ext_py + # Label is optional here, but it can help identify more complex branches + random_choice_ext_py >> Label(option) >> t >> join_ext_py - # Example the same with Python virtual environments + # Example the same with Python virtual environments - # [START howto_operator_branch_virtualenv] - # Note: Passing a caching dir allows to keep the virtual environment over multiple runs - # Run the example a second time and see that it re-uses it and is faster. - VENV_CACHE_PATH = tempfile.gettempdir() + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = tempfile.gettempdir() - @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) - def branching_virtualenv(choices) -> str: - import random + @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) + def branching_virtualenv(choices) -> str: + import random - import numpy as np + import numpy as np - print(f"Some numpy stuff: {np.arange(6)}") - return f"venv_{random.choice(choices)}" + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" - # [END howto_operator_branch_virtualenv] + # [END howto_operator_branch_virtualenv] - random_choice_venv = branching_virtualenv(choices=options) + random_choice_venv = branching_virtualenv(choices=options) - join_ext_py >> random_choice_venv + join_ext_py >> random_choice_venv - join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - for option in options: + for option in options: - @task.virtualenv( - task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH - ) - def some_venv_task(): - import numpy as np + @task.virtualenv( + task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH + ) + def some_venv_task(): + import numpy as np - print(f"Some numpy stuff: {np.arange(6)}") + print(f"Some numpy stuff: {np.arange(6)}") - t = some_venv_task() + t = some_venv_task() - # Label is optional here, but it can help identify more complex branches - random_choice_venv >> Label(option) >> t >> join_venv + # Label is optional here, but it can help identify more complex branches + random_choice_venv >> Label(option) >> t >> join_venv diff --git a/airflow/example_dags/example_python_decorator.py b/airflow/example_dags/example_python_decorator.py index 62de60fd6987b..7619bc3b6a517 100644 --- a/airflow/example_dags/example_python_decorator.py +++ b/airflow/example_dags/example_python_decorator.py @@ -30,7 +30,6 @@ import pendulum from airflow.decorators import dag, task -from airflow.providers.standard.operators.python import is_venv_installed log = logging.getLogger(__name__) @@ -76,61 +75,58 @@ def my_sleeping_function(random_base): run_this >> log_the_sql >> sleeping_task # [END howto_operator_python_kwargs] - if not is_venv_installed(): - log.warning("The virtalenv_python example task requires virtualenv, please install it.") - else: - # [START howto_operator_python_venv] - @task.virtualenv( - task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False - ) - def callable_virtualenv(): - """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. - """ - from time import sleep - - from colorama import Back, Fore, Style - - print(Fore.RED + "some red text") - print(Back.GREEN + "and with a green background") - print(Style.DIM + "and in dim text") - print(Style.RESET_ALL) - for _ in range(4): - print(Style.DIM + "Please wait...", flush=True) - sleep(1) - print("Finished") - - virtualenv_task = callable_virtualenv() - # [END howto_operator_python_venv] - - sleeping_task >> virtualenv_task - - # [START howto_operator_external_python] - @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY) - def callable_external_python(): - """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. - """ - import sys - from time import sleep - - print(f"Running task via {sys.executable}") - print("Sleeping") - for _ in range(4): - print("Please wait...", flush=True) - sleep(1) - print("Finished") - - external_python_task = callable_external_python() - # [END howto_operator_external_python] - - run_this >> external_python_task >> virtualenv_task + # [START howto_operator_python_venv] + @task.virtualenv( + task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False + ) + def callable_virtualenv(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + from time import sleep + + from colorama import Back, Fore, Style + + print(Fore.RED + "some red text") + print(Back.GREEN + "and with a green background") + print(Style.DIM + "and in dim text") + print(Style.RESET_ALL) + for _ in range(4): + print(Style.DIM + "Please wait...", flush=True) + sleep(1) + print("Finished") + + virtualenv_task = callable_virtualenv() + # [END howto_operator_python_venv] + + sleeping_task >> virtualenv_task + + # [START howto_operator_external_python] + @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY) + def callable_external_python(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + import sys + from time import sleep + + print(f"Running task via {sys.executable}") + print("Sleeping") + for _ in range(4): + print("Please wait...", flush=True) + sleep(1) + print("Finished") + + external_python_task = callable_external_python() + # [END howto_operator_external_python] + + run_this >> external_python_task >> virtualenv_task example_python_decorator() diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 8452dfb9358be..976813d53fd98 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -34,7 +34,6 @@ ExternalPythonOperator, PythonOperator, PythonVirtualenvOperator, - is_venv_installed, ) log = logging.getLogger(__name__) @@ -89,63 +88,60 @@ def my_sleeping_function(random_base): run_this >> log_the_sql >> sleeping_task # [END howto_operator_python_kwargs] - if not is_venv_installed(): - log.warning("The virtalenv_python example task requires virtualenv, please install it.") - else: - # [START howto_operator_python_venv] - def callable_virtualenv(): - """ - Example function that will be performed in a virtual environment. - - Importing at the function level ensures that it will not attempt to import the - library before it is installed. - """ - from time import sleep - - from colorama import Back, Fore, Style - - print(Fore.RED + "some red text") - print(Back.GREEN + "and with a green background") - print(Style.DIM + "and in dim text") - print(Style.RESET_ALL) - for _ in range(4): - print(Style.DIM + "Please wait...", flush=True) - sleep(1) - print("Finished") - - virtualenv_task = PythonVirtualenvOperator( - task_id="virtualenv_python", - python_callable=callable_virtualenv, - requirements=["colorama==0.4.0"], - system_site_packages=False, - ) - # [END howto_operator_python_venv] - - sleeping_task >> virtualenv_task - - # [START howto_operator_external_python] - def callable_external_python(): - """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. - """ - import sys - from time import sleep - - print(f"Running task via {sys.executable}") - print("Sleeping") - for _ in range(4): - print("Please wait...", flush=True) - sleep(1) - print("Finished") - - external_python_task = ExternalPythonOperator( - task_id="external_python", - python_callable=callable_external_python, - python=PATH_TO_PYTHON_BINARY, - ) - # [END howto_operator_external_python] + # [START howto_operator_python_venv] + def callable_virtualenv(): + """ + Example function that will be performed in a virtual environment. + + Importing at the function level ensures that it will not attempt to import the + library before it is installed. + """ + from time import sleep + + from colorama import Back, Fore, Style + + print(Fore.RED + "some red text") + print(Back.GREEN + "and with a green background") + print(Style.DIM + "and in dim text") + print(Style.RESET_ALL) + for _ in range(4): + print(Style.DIM + "Please wait...", flush=True) + sleep(1) + print("Finished") + + virtualenv_task = PythonVirtualenvOperator( + task_id="virtualenv_python", + python_callable=callable_virtualenv, + requirements=["colorama==0.4.0"], + system_site_packages=False, + ) + # [END howto_operator_python_venv] + + sleeping_task >> virtualenv_task + + # [START howto_operator_external_python] + def callable_external_python(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + import sys + from time import sleep + + print(f"Running task via {sys.executable}") + print("Sleeping") + for _ in range(4): + print("Please wait...", flush=True) + sleep(1) + print("Finished") + + external_python_task = ExternalPythonOperator( + task_id="external_python", + python_callable=callable_external_python, + python=PATH_TO_PYTHON_BINARY, + ) + # [END howto_operator_external_python] - run_this >> external_python_task >> virtualenv_task + run_this >> external_python_task >> virtualenv_task diff --git a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py index 357a1e7b290a9..fd9ee3e7b5abd 100644 --- a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py +++ b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py @@ -21,67 +21,64 @@ from datetime import datetime from airflow.decorators import dag, task -from airflow.providers.standard.operators.python import is_venv_installed log = logging.getLogger(__name__) -if not is_venv_installed(): - log.warning("The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.") -else: - @dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]) - def tutorial_taskflow_api_virtualenv(): +@dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]) +def tutorial_taskflow_api_virtualenv(): + """ + ### TaskFlow API example using virtualenv + This is a simple data pipeline example which demonstrates the use of + the TaskFlow API using three simple tasks for Extract, Transform, and Load. + """ + + @task.virtualenv( + serializer="dill", # Use `dill` for advanced serialization. + system_site_packages=False, + requirements=["funcsigs"], + ) + def extract(): + """ + #### Extract task + A simple Extract task to get data ready for the rest of the data + pipeline. In this case, getting data is simulated by reading from a + hardcoded JSON string. + """ + import json + + data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' + + order_data_dict = json.loads(data_string) + return order_data_dict + + @task(multiple_outputs=True) + def transform(order_data_dict: dict): + """ + #### Transform task + A simple Transform task which takes in the collection of order data and + computes the total order value. """ - ### TaskFlow API example using virtualenv - This is a simple data pipeline example which demonstrates the use of - the TaskFlow API using three simple tasks for Extract, Transform, and Load. + total_order_value = 0 + + for value in order_data_dict.values(): + total_order_value += value + + return {"total_order_value": total_order_value} + + @task() + def load(total_order_value: float): + """ + #### Load task + A simple Load task which takes in the result of the Transform task and + instead of saving it to end user review, just prints it out. """ - @task.virtualenv( - serializer="dill", # Use `dill` for advanced serialization. - system_site_packages=False, - requirements=["funcsigs"], - ) - def extract(): - """ - #### Extract task - A simple Extract task to get data ready for the rest of the data - pipeline. In this case, getting data is simulated by reading from a - hardcoded JSON string. - """ - import json - - data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' - - order_data_dict = json.loads(data_string) - return order_data_dict - - @task(multiple_outputs=True) - def transform(order_data_dict: dict): - """ - #### Transform task - A simple Transform task which takes in the collection of order data and - computes the total order value. - """ - total_order_value = 0 - - for value in order_data_dict.values(): - total_order_value += value - - return {"total_order_value": total_order_value} - - @task() - def load(total_order_value: float): - """ - #### Load task - A simple Load task which takes in the result of the Transform task and - instead of saving it to end user review, just prints it out. - """ - - print(f"Total order value is: {total_order_value:.2f}") - - order_data = extract() - order_summary = transform(order_data) - load(order_summary["total_order_value"]) - - tutorial_dag = tutorial_taskflow_api_virtualenv() + print(f"Total order value is: {total_order_value:.2f}") + + order_data = extract() + order_summary = transform(order_data) + load(order_summary["total_order_value"]) + + +tutorial_dag = tutorial_taskflow_api_virtualenv() diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index 8bfbdb630c9f7..b21358db017f4 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -165,8 +165,7 @@ Those extras are available as regular core airflow extras - they install optiona .. START CORE EXTRAS HERE aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, github-enterprise, google- -auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv, -virtualenv +auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv .. END CORE EXTRAS HERE diff --git a/dev/breeze/doc/images/output_prod-image_build.txt b/dev/breeze/doc/images/output_prod-image_build.txt index 9c6c509a66902..8610d58e7ac25 100644 --- a/dev/breeze/doc/images/output_prod-image_build.txt +++ b/dev/breeze/doc/images/output_prod-image_build.txt @@ -1 +1 @@ -fe048412f9fc1527a30eaaf0a986fa16 +3576058438d009eb3dc8be53ea16be8a diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index b309620cd47de..d96e8204df3ea 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -555,7 +555,6 @@ def get_airflow_extras(): "ssh", "statsd", "uv", - "virtualenv", # END OF EXTRAS LIST UPDATED BY PRE COMMIT ] diff --git a/docker_tests/test_prod_image.py b/docker_tests/test_prod_image.py index a11ba9495db80..95c284dec1fe7 100644 --- a/docker_tests/test_prod_image.py +++ b/docker_tests/test_prod_image.py @@ -171,7 +171,6 @@ def test_pip_dependencies_conflict(self, default_docker_image): "sftp/ssh": ["paramiko", "sshtunnel"], "slack": ["slack_sdk"], "statsd": ["statsd"], - "virtualenv": ["virtualenv"], } @pytest.mark.skipif(os.environ.get("TEST_SLIM_IMAGE") == "true", reason="Skipped with slim image") diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 92ab5b7ad8392..5d54a94a42217 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -87,8 +87,6 @@ python dependencies for the provided package. +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | uv | ``pip install 'apache-airflow[uv]'`` | Install uv - fast, Rust-based package installer (experimental) | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ -| virtualenv | ``pip install 'apache-airflow[virtualenv]'`` | Running python tasks in local virtualenv | -+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | cloudpickle | pip install apache-airflow[cloudpickle] | Cloudpickle hooks and operators | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ diff --git a/docs/docker-stack/build-arg-ref.rst b/docs/docker-stack/build-arg-ref.rst index 1c64db79c636f..b665f35b7c4bc 100644 --- a/docs/docker-stack/build-arg-ref.rst +++ b/docs/docker-stack/build-arg-ref.rst @@ -118,7 +118,6 @@ List of default extras in the production Dockerfile: * ssh * statsd * uv -* virtualenv .. END OF EXTRAS LIST UPDATED BY PRE COMMIT diff --git a/docs/docker-stack/changelog.rst b/docs/docker-stack/changelog.rst index 27df35239ece8..9b135b80b5535 100644 --- a/docs/docker-stack/changelog.rst +++ b/docs/docker-stack/changelog.rst @@ -34,6 +34,12 @@ the Airflow team. any Airflow version from the ``Airflow 2`` line. There is no guarantee that it will work, but if it does, then you can use latest features from that image to build images for previous Airflow versions. +Airflow 3.0 +~~~~~~~~~~~ + + * The ``virtualenv`` package is no longer installed in the reference container. Airflow 3 and standard + provider relies on ``venv`` module from Python standard library. + Airflow 2.10 ~~~~~~~~~~~~ * The image does not support Debian-Bullseye(11) anymore. The image is based on Debian-Bookworm (12). diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 3d7e89e4fb947..ed1cd80edc9f4 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1286,8 +1286,7 @@ "standard": { "deps": [ "apache-airflow-providers-common-sql>=1.18.0", - "apache-airflow>=2.8.0", - "virtualenv>=20.26.0" + "apache-airflow>=2.8.0" ], "devel-deps": [], "plugins": [], diff --git a/hatch_build.py b/hatch_build.py index bffe540f28830..7bc7cf2ccd1d7 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -137,9 +137,6 @@ "uv": [ "uv>=0.1.32", ], - "virtualenv": [ - "virtualenv>=20.26.0", - ], } DOC_EXTRAS: dict[str, list[str]] = { @@ -192,8 +189,7 @@ "click>=8.0", "gitpython>=3.1.40", "hatch>=1.9.1", - # Incremental 24.7.0, 24.7.1 has broken `python -m virtualenv` command when run in /opt/airflow directory - "incremental!=24.7.0,!=24.7.1,>=22.10.0", + "incremental>=24.7.2", "pipdeptree>=2.13.1", "pygithub>=2.1.1", "restructuredtext-lint>=1.4.0", diff --git a/newsfragments/43568.significant.rst b/newsfragments/43568.significant.rst new file mode 100644 index 0000000000000..450ca5b4a27f9 --- /dev/null +++ b/newsfragments/43568.significant.rst @@ -0,0 +1 @@ +Remove ``virtualenv`` extra as PythonVirtualenvOperator has been moved to standard provider and switched to use built-in venv package. diff --git a/providers/src/airflow/providers/standard/CHANGELOG.rst b/providers/src/airflow/providers/standard/CHANGELOG.rst index c359185f0ca2c..d24449605beff 100644 --- a/providers/src/airflow/providers/standard/CHANGELOG.rst +++ b/providers/src/airflow/providers/standard/CHANGELOG.rst @@ -42,3 +42,5 @@ Breaking changes ~~~~~~~~~~~~~~~~ * ``In BranchDayOfWeekOperator, DayOfWeekSensor, BranchDateTimeOperator parameter use_task_execution_date has been removed. Please use use_task_logical_date.`` +* ``PythonVirtualenvOperator uses built-in venv instead of virtualenv package.`` +* ``is_venv_installed method has been removed from PythonVirtualenvOperator as venv is built-in.`` diff --git a/providers/src/airflow/providers/standard/operators/python.py b/providers/src/airflow/providers/standard/operators/python.py index 4d908f6ba6809..fb4babaf4aa2a 100644 --- a/providers/src/airflow/providers/standard/operators/python.py +++ b/providers/src/airflow/providers/standard/operators/python.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import importlib import inspect import json import logging @@ -73,17 +72,6 @@ from airflow.utils.context import Context -def is_venv_installed() -> bool: - """ - Check if the virtualenv package is installed via checking if it is on the path or installed as package. - - :return: True if it is. Whichever way of checking it works, is fine. - """ - if shutil.which("virtualenv") or importlib.util.find_spec("virtualenv"): - return True - return False - - @cache def _parse_version_info(text: str) -> tuple[int, int, int, str, int]: """Parse python version info from a text.""" @@ -710,8 +698,6 @@ def __init__( "Passing non-string types (e.g. int or float) as python_version not supported" ) - if not is_venv_installed(): - raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") if use_airflow_context and (not expect_airflow and not system_site_packages): error_msg = "use_airflow_context is set to True, but expect_airflow and system_site_packages are set to False." raise AirflowException(error_msg) diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index be6467f73c0d5..ee069165dfe38 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -30,7 +30,6 @@ versions: dependencies: - apache-airflow>=2.8.0 - apache-airflow-providers-common-sql>=1.18.0 - - virtualenv>=20.26.0 integrations: - integration-name: Standard diff --git a/providers/src/airflow/providers/standard/utils/python_virtualenv.py b/providers/src/airflow/providers/standard/utils/python_virtualenv.py index 00615d70f0f1c..b9c7be4c2832d 100644 --- a/providers/src/airflow/providers/standard/utils/python_virtualenv.py +++ b/providers/src/airflow/providers/standard/utils/python_virtualenv.py @@ -29,12 +29,13 @@ from airflow.utils.process_utils import execute_in_subprocess -def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]: - cmd = [sys.executable, "-m", "virtualenv", tmp_dir] +def _generate_venv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]: + """We are using venv command instead of venv module to allow creation of venv for different python versions.""" + if python_bin is None: + python_bin = sys.executable + cmd = [python_bin, "-m", "venv", tmp_dir] if system_site_packages: cmd.append("--system-site-packages") - if python_bin is not None: - cmd.append(f"--python={python_bin}") return cmd @@ -90,8 +91,8 @@ def prepare_virtualenv( if index_urls is not None: _generate_pip_conf(Path(venv_directory) / "pip.conf", index_urls) - virtualenv_cmd = _generate_virtualenv_cmd(venv_directory, python_bin, system_site_packages) - execute_in_subprocess(virtualenv_cmd) + venv_cmd = _generate_venv_cmd(venv_directory, python_bin, system_site_packages) + execute_in_subprocess(venv_cmd) if requirements is not None and requirements_file_path is not None: raise ValueError("Either requirements OR requirements_file_path has to be passed, but not both") diff --git a/providers/tests/standard/operators/test_python.py b/providers/tests/standard/operators/test_python.py index 1fd422d6c4f63..b8a8ef5bc1228 100644 --- a/providers/tests/standard/operators/test_python.py +++ b/providers/tests/standard/operators/test_python.py @@ -1129,18 +1129,6 @@ def default_kwargs(*, python_version=DEFAULT_PYTHON_VERSION, **kwargs): kwargs["venv_cache_path"] = venv_cache_path return kwargs - @mock.patch("shutil.which") - @mock.patch("airflow.providers.standard.operators.python.importlib") - def test_virtualenv_not_installed(self, importlib_mock, which_mock): - which_mock.return_value = None - importlib_mock.util.find_spec.return_value = None - - def f(): - pass - - with pytest.raises(AirflowException, match="requires virtualenv"): - self.run_as_task(f) - @CLOUDPICKLE_MARKER def test_add_cloudpickle(self): def f(): diff --git a/providers/tests/standard/utils/test_python_virtualenv.py b/providers/tests/standard/utils/test_python_virtualenv.py index b5f8321d1bda8..da9cf757d74f2 100644 --- a/providers/tests/standard/utils/test_python_virtualenv.py +++ b/providers/tests/standard/utils/test_python_virtualenv.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import sys from pathlib import Path from unittest import mock @@ -66,9 +65,7 @@ def test_should_create_virtualenv(self, mock_execute_in_subprocess): venv_directory="/VENV", python_bin="pythonVER", system_site_packages=False, requirements=[] ) assert "/VENV/bin/python" == python_bin - mock_execute_in_subprocess.assert_called_once_with( - [sys.executable, "-m", "virtualenv", "/VENV", "--python=pythonVER"] - ) + mock_execute_in_subprocess.assert_called_once_with(["pythonVER", "-m", "venv", "/VENV"]) @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") def test_should_create_virtualenv_with_system_packages(self, mock_execute_in_subprocess): @@ -77,7 +74,7 @@ def test_should_create_virtualenv_with_system_packages(self, mock_execute_in_sub ) assert "/VENV/bin/python" == python_bin mock_execute_in_subprocess.assert_called_once_with( - [sys.executable, "-m", "virtualenv", "/VENV", "--system-site-packages", "--python=pythonVER"] + ["pythonVER", "-m", "venv", "/VENV", "--system-site-packages"] ) @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") @@ -93,7 +90,7 @@ def test_pip_install_options(self, mock_execute_in_subprocess): assert "/VENV/bin/python" == python_bin mock_execute_in_subprocess.assert_any_call( - [sys.executable, "-m", "virtualenv", "/VENV", "--system-site-packages", "--python=pythonVER"] + ["pythonVER", "-m", "venv", "/VENV", "--system-site-packages"] ) mock_execute_in_subprocess.assert_called_with( ["/VENV/bin/pip", "install", *pip_install_options, "apache-beam[gcp]"] @@ -109,9 +106,7 @@ def test_should_create_virtualenv_with_extra_packages(self, mock_execute_in_subp ) assert "/VENV/bin/python" == python_bin - mock_execute_in_subprocess.assert_any_call( - [sys.executable, "-m", "virtualenv", "/VENV", "--python=pythonVER"] - ) + mock_execute_in_subprocess.assert_any_call(["pythonVER", "-m", "venv", "/VENV"]) mock_execute_in_subprocess.assert_called_with(["/VENV/bin/pip", "install", "apache-beam[gcp]"]) diff --git a/pyproject.toml b/pyproject.toml index 6e9c3a6f9718b..58e0a528858ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,8 +78,7 @@ dynamic = ["version", "optional-dependencies", "dependencies"] # START CORE EXTRAS HERE # # aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, github-enterprise, google- -# auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv, -# virtualenv +# auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv # # END CORE EXTRAS HERE # diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index 554b33ceb9b77..00964b00818f3 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -33,7 +33,7 @@ pytestmark = pytest.mark.db_test DEFAULT_DATE = timezone.datetime(2016, 1, 1) -PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" +PYTHON_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}" DILL_INSTALLED = find_spec("dill") is not None DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, reason="`dill` is not installed") CLOUDPICKLE_INSTALLED = find_spec("cloudpickle") is not None