diff --git a/INSTALL b/INSTALL index 7520cb88aff378..edf738f5e114da 100644 --- a/INSTALL +++ b/INSTALL @@ -259,8 +259,7 @@ Those extras are available as regular core airflow extras - they install optiona # START CORE EXTRAS HERE aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, github-enterprise, google- -auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv, -virtualenv +auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv # END CORE EXTRAS HERE diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index db79ac0c211a0d..35f7a9b548e56b 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -29,143 +29,140 @@ import pendulum -from airflow.providers.standard.operators.python import is_venv_installed - -if is_venv_installed(): - from airflow.models.dag import DAG - from airflow.operators.empty import EmptyOperator - from airflow.providers.standard.operators.python import ( - BranchExternalPythonOperator, - BranchPythonOperator, - BranchPythonVirtualenvOperator, - ExternalPythonOperator, - PythonOperator, - PythonVirtualenvOperator, +from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.python import ( + BranchExternalPythonOperator, + BranchPythonOperator, + BranchPythonVirtualenvOperator, + ExternalPythonOperator, + PythonOperator, + PythonVirtualenvOperator, +) +from airflow.utils.edgemodifier import Label +from airflow.utils.trigger_rule import TriggerRule + +PATH_TO_PYTHON_BINARY = sys.executable + +with DAG( + dag_id="example_branch_operator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + schedule="@daily", + tags=["example", "example2"], + orientation="TB", +) as dag: + run_this_first = EmptyOperator( + task_id="run_this_first", ) - from airflow.utils.edgemodifier import Label - from airflow.utils.trigger_rule import TriggerRule - - PATH_TO_PYTHON_BINARY = sys.executable - - with DAG( - dag_id="example_branch_operator", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - schedule="@daily", - tags=["example", "example2"], - orientation="TB", - ) as dag: - run_this_first = EmptyOperator( - task_id="run_this_first", - ) - options = ["a", "b", "c", "d"] + options = ["a", "b", "c", "d"] + + # Example branching on standard Python tasks - # Example branching on standard Python tasks + # [START howto_operator_branch_python] + branching = BranchPythonOperator( + task_id="branching", + python_callable=lambda: f"branch_{random.choice(options)}", + ) + # [END howto_operator_branch_python] + run_this_first >> branching + + join = EmptyOperator( + task_id="join", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) - # [START howto_operator_branch_python] - branching = BranchPythonOperator( - task_id="branching", - python_callable=lambda: f"branch_{random.choice(options)}", + for option in options: + t = PythonOperator( + task_id=f"branch_{option}", + python_callable=lambda: print("Hello World"), ) - # [END howto_operator_branch_python] - run_this_first >> branching - join = EmptyOperator( - task_id="join", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + empty_follow = EmptyOperator( + task_id="follow_" + option, ) - for option in options: - t = PythonOperator( - task_id=f"branch_{option}", - python_callable=lambda: print("Hello World"), - ) + # Label is optional here, but it can help identify more complex branches + branching >> Label(option) >> t >> empty_follow >> join - empty_follow = EmptyOperator( - task_id="follow_" + option, - ) + # Example the same with external Python calls - # Label is optional here, but it can help identify more complex branches - branching >> Label(option) >> t >> empty_follow >> join + # [START howto_operator_branch_ext_py] + def branch_with_external_python(choices): + import random - # Example the same with external Python calls + return f"ext_py_{random.choice(choices)}" - # [START howto_operator_branch_ext_py] - def branch_with_external_python(choices): - import random + branching_ext_py = BranchExternalPythonOperator( + task_id="branching_ext_python", + python=PATH_TO_PYTHON_BINARY, + python_callable=branch_with_external_python, + op_args=[options], + ) + # [END howto_operator_branch_ext_py] + join >> branching_ext_py + + join_ext_py = EmptyOperator( + task_id="join_ext_python", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) - return f"ext_py_{random.choice(choices)}" + def hello_world_with_external_python(): + print("Hello World from external Python") - branching_ext_py = BranchExternalPythonOperator( - task_id="branching_ext_python", + for option in options: + t = ExternalPythonOperator( + task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY, - python_callable=branch_with_external_python, - op_args=[options], + python_callable=hello_world_with_external_python, ) - # [END howto_operator_branch_ext_py] - join >> branching_ext_py - join_ext_py = EmptyOperator( - task_id="join_ext_python", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, - ) + # Label is optional here, but it can help identify more complex branches + branching_ext_py >> Label(option) >> t >> join_ext_py - def hello_world_with_external_python(): - print("Hello World from external Python") + # Example the same with Python virtual environments - for option in options: - t = ExternalPythonOperator( - task_id=f"ext_py_{option}", - python=PATH_TO_PYTHON_BINARY, - python_callable=hello_world_with_external_python, - ) + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = Path(tempfile.gettempdir()) - # Label is optional here, but it can help identify more complex branches - branching_ext_py >> Label(option) >> t >> join_ext_py + def branch_with_venv(choices): + import random - # Example the same with Python virtual environments + import numpy as np - # [START howto_operator_branch_virtualenv] - # Note: Passing a caching dir allows to keep the virtual environment over multiple runs - # Run the example a second time and see that it re-uses it and is faster. - VENV_CACHE_PATH = Path(tempfile.gettempdir()) + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" - def branch_with_venv(choices): - import random + branching_venv = BranchPythonVirtualenvOperator( + task_id="branching_venv", + requirements=["numpy~=1.26.0"], + venv_cache_path=VENV_CACHE_PATH, + python_callable=branch_with_venv, + op_args=[options], + ) + # [END howto_operator_branch_virtualenv] + join_ext_py >> branching_venv + + join_venv = EmptyOperator( + task_id="join_venv", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) - import numpy as np + def hello_world_with_venv(): + import numpy as np - print(f"Some numpy stuff: {np.arange(6)}") - return f"venv_{random.choice(choices)}" + print(f"Hello World with some numpy stuff: {np.arange(6)}") - branching_venv = BranchPythonVirtualenvOperator( - task_id="branching_venv", + for option in options: + t = PythonVirtualenvOperator( + task_id=f"venv_{option}", requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH, - python_callable=branch_with_venv, - op_args=[options], - ) - # [END howto_operator_branch_virtualenv] - join_ext_py >> branching_venv - - join_venv = EmptyOperator( - task_id="join_venv", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + python_callable=hello_world_with_venv, ) - def hello_world_with_venv(): - import numpy as np - - print(f"Hello World with some numpy stuff: {np.arange(6)}") - - for option in options: - t = PythonVirtualenvOperator( - task_id=f"venv_{option}", - requirements=["numpy~=1.26.0"], - venv_cache_path=VENV_CACHE_PATH, - python_callable=hello_world_with_venv, - ) - - # Label is optional here, but it can help identify more complex branches - branching_venv >> Label(option) >> t >> join_venv + # Label is optional here, but it can help identify more complex branches + branching_venv >> Label(option) >> t >> join_venv diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py index 6777db38a9b6c3..e9b3bea97a72f8 100644 --- a/airflow/example_dags/example_branch_operator_decorator.py +++ b/airflow/example_dags/example_branch_operator_decorator.py @@ -30,121 +30,116 @@ import pendulum -from airflow.providers.standard.operators.python import is_venv_installed +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator +from airflow.utils.edgemodifier import Label +from airflow.utils.trigger_rule import TriggerRule -if is_venv_installed(): - from airflow.decorators import task - from airflow.models.dag import DAG - from airflow.operators.empty import EmptyOperator - from airflow.utils.edgemodifier import Label - from airflow.utils.trigger_rule import TriggerRule +PATH_TO_PYTHON_BINARY = sys.executable - PATH_TO_PYTHON_BINARY = sys.executable +with DAG( + dag_id="example_branch_python_operator_decorator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + schedule="@daily", + tags=["example", "example2"], + orientation="TB", +) as dag: + run_this_first = EmptyOperator(task_id="run_this_first") - with DAG( - dag_id="example_branch_python_operator_decorator", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - schedule="@daily", - tags=["example", "example2"], - orientation="TB", - ) as dag: - run_this_first = EmptyOperator(task_id="run_this_first") + options = ["a", "b", "c", "d"] - options = ["a", "b", "c", "d"] + # Example branching on standard Python tasks - # Example branching on standard Python tasks + # [START howto_operator_branch_python] + @task.branch() + def branching(choices: list[str]) -> str: + return f"branch_{random.choice(choices)}" - # [START howto_operator_branch_python] - @task.branch() - def branching(choices: list[str]) -> str: - return f"branch_{random.choice(choices)}" + # [END howto_operator_branch_python] - # [END howto_operator_branch_python] + random_choice_instance = branching(choices=options) - random_choice_instance = branching(choices=options) + run_this_first >> random_choice_instance - run_this_first >> random_choice_instance + join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + for option in options: - for option in options: + @task(task_id=f"branch_{option}") + def some_task(): + print("doing something in Python") - @task(task_id=f"branch_{option}") - def some_task(): - print("doing something in Python") + t = some_task() + empty = EmptyOperator(task_id=f"follow_{option}") - t = some_task() - empty = EmptyOperator(task_id=f"follow_{option}") + # Label is optional here, but it can help identify more complex branches + random_choice_instance >> Label(option) >> t >> empty >> join - # Label is optional here, but it can help identify more complex branches - random_choice_instance >> Label(option) >> t >> empty >> join + # Example the same with external Python calls - # Example the same with external Python calls + # [START howto_operator_branch_ext_py] + @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) + def branching_ext_python(choices) -> str: + import random - # [START howto_operator_branch_ext_py] - @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) - def branching_ext_python(choices) -> str: - import random + return f"ext_py_{random.choice(choices)}" - return f"ext_py_{random.choice(choices)}" + # [END howto_operator_branch_ext_py] - # [END howto_operator_branch_ext_py] + random_choice_ext_py = branching_ext_python(choices=options) - random_choice_ext_py = branching_ext_python(choices=options) + join >> random_choice_ext_py - join >> random_choice_ext_py + join_ext_py = EmptyOperator(task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - join_ext_py = EmptyOperator( - task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS - ) - - for option in options: + for option in options: - @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) - def some_ext_py_task(): - print("doing something in external Python") + @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) + def some_ext_py_task(): + print("doing something in external Python") - t = some_ext_py_task() + t = some_ext_py_task() - # Label is optional here, but it can help identify more complex branches - random_choice_ext_py >> Label(option) >> t >> join_ext_py + # Label is optional here, but it can help identify more complex branches + random_choice_ext_py >> Label(option) >> t >> join_ext_py - # Example the same with Python virtual environments + # Example the same with Python virtual environments - # [START howto_operator_branch_virtualenv] - # Note: Passing a caching dir allows to keep the virtual environment over multiple runs - # Run the example a second time and see that it re-uses it and is faster. - VENV_CACHE_PATH = tempfile.gettempdir() + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = tempfile.gettempdir() - @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) - def branching_virtualenv(choices) -> str: - import random + @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) + def branching_virtualenv(choices) -> str: + import random - import numpy as np + import numpy as np - print(f"Some numpy stuff: {np.arange(6)}") - return f"venv_{random.choice(choices)}" + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" - # [END howto_operator_branch_virtualenv] + # [END howto_operator_branch_virtualenv] - random_choice_venv = branching_virtualenv(choices=options) + random_choice_venv = branching_virtualenv(choices=options) - join_ext_py >> random_choice_venv + join_ext_py >> random_choice_venv - join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - for option in options: + for option in options: - @task.virtualenv( - task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH - ) - def some_venv_task(): - import numpy as np + @task.virtualenv( + task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH + ) + def some_venv_task(): + import numpy as np - print(f"Some numpy stuff: {np.arange(6)}") + print(f"Some numpy stuff: {np.arange(6)}") - t = some_venv_task() + t = some_venv_task() - # Label is optional here, but it can help identify more complex branches - random_choice_venv >> Label(option) >> t >> join_venv + # Label is optional here, but it can help identify more complex branches + random_choice_venv >> Label(option) >> t >> join_venv diff --git a/airflow/example_dags/example_python_decorator.py b/airflow/example_dags/example_python_decorator.py index 62de60fd6987b4..7619bc3b6a5175 100644 --- a/airflow/example_dags/example_python_decorator.py +++ b/airflow/example_dags/example_python_decorator.py @@ -30,7 +30,6 @@ import pendulum from airflow.decorators import dag, task -from airflow.providers.standard.operators.python import is_venv_installed log = logging.getLogger(__name__) @@ -76,61 +75,58 @@ def my_sleeping_function(random_base): run_this >> log_the_sql >> sleeping_task # [END howto_operator_python_kwargs] - if not is_venv_installed(): - log.warning("The virtalenv_python example task requires virtualenv, please install it.") - else: - # [START howto_operator_python_venv] - @task.virtualenv( - task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False - ) - def callable_virtualenv(): - """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. - """ - from time import sleep - - from colorama import Back, Fore, Style - - print(Fore.RED + "some red text") - print(Back.GREEN + "and with a green background") - print(Style.DIM + "and in dim text") - print(Style.RESET_ALL) - for _ in range(4): - print(Style.DIM + "Please wait...", flush=True) - sleep(1) - print("Finished") - - virtualenv_task = callable_virtualenv() - # [END howto_operator_python_venv] - - sleeping_task >> virtualenv_task - - # [START howto_operator_external_python] - @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY) - def callable_external_python(): - """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. - """ - import sys - from time import sleep - - print(f"Running task via {sys.executable}") - print("Sleeping") - for _ in range(4): - print("Please wait...", flush=True) - sleep(1) - print("Finished") - - external_python_task = callable_external_python() - # [END howto_operator_external_python] - - run_this >> external_python_task >> virtualenv_task + # [START howto_operator_python_venv] + @task.virtualenv( + task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False + ) + def callable_virtualenv(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + from time import sleep + + from colorama import Back, Fore, Style + + print(Fore.RED + "some red text") + print(Back.GREEN + "and with a green background") + print(Style.DIM + "and in dim text") + print(Style.RESET_ALL) + for _ in range(4): + print(Style.DIM + "Please wait...", flush=True) + sleep(1) + print("Finished") + + virtualenv_task = callable_virtualenv() + # [END howto_operator_python_venv] + + sleeping_task >> virtualenv_task + + # [START howto_operator_external_python] + @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY) + def callable_external_python(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + import sys + from time import sleep + + print(f"Running task via {sys.executable}") + print("Sleeping") + for _ in range(4): + print("Please wait...", flush=True) + sleep(1) + print("Finished") + + external_python_task = callable_external_python() + # [END howto_operator_external_python] + + run_this >> external_python_task >> virtualenv_task example_python_decorator() diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 8452dfb9358be5..976813d53fd984 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -34,7 +34,6 @@ ExternalPythonOperator, PythonOperator, PythonVirtualenvOperator, - is_venv_installed, ) log = logging.getLogger(__name__) @@ -89,63 +88,60 @@ def my_sleeping_function(random_base): run_this >> log_the_sql >> sleeping_task # [END howto_operator_python_kwargs] - if not is_venv_installed(): - log.warning("The virtalenv_python example task requires virtualenv, please install it.") - else: - # [START howto_operator_python_venv] - def callable_virtualenv(): - """ - Example function that will be performed in a virtual environment. - - Importing at the function level ensures that it will not attempt to import the - library before it is installed. - """ - from time import sleep - - from colorama import Back, Fore, Style - - print(Fore.RED + "some red text") - print(Back.GREEN + "and with a green background") - print(Style.DIM + "and in dim text") - print(Style.RESET_ALL) - for _ in range(4): - print(Style.DIM + "Please wait...", flush=True) - sleep(1) - print("Finished") - - virtualenv_task = PythonVirtualenvOperator( - task_id="virtualenv_python", - python_callable=callable_virtualenv, - requirements=["colorama==0.4.0"], - system_site_packages=False, - ) - # [END howto_operator_python_venv] - - sleeping_task >> virtualenv_task - - # [START howto_operator_external_python] - def callable_external_python(): - """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. - """ - import sys - from time import sleep - - print(f"Running task via {sys.executable}") - print("Sleeping") - for _ in range(4): - print("Please wait...", flush=True) - sleep(1) - print("Finished") - - external_python_task = ExternalPythonOperator( - task_id="external_python", - python_callable=callable_external_python, - python=PATH_TO_PYTHON_BINARY, - ) - # [END howto_operator_external_python] + # [START howto_operator_python_venv] + def callable_virtualenv(): + """ + Example function that will be performed in a virtual environment. + + Importing at the function level ensures that it will not attempt to import the + library before it is installed. + """ + from time import sleep + + from colorama import Back, Fore, Style + + print(Fore.RED + "some red text") + print(Back.GREEN + "and with a green background") + print(Style.DIM + "and in dim text") + print(Style.RESET_ALL) + for _ in range(4): + print(Style.DIM + "Please wait...", flush=True) + sleep(1) + print("Finished") + + virtualenv_task = PythonVirtualenvOperator( + task_id="virtualenv_python", + python_callable=callable_virtualenv, + requirements=["colorama==0.4.0"], + system_site_packages=False, + ) + # [END howto_operator_python_venv] + + sleeping_task >> virtualenv_task + + # [START howto_operator_external_python] + def callable_external_python(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + import sys + from time import sleep + + print(f"Running task via {sys.executable}") + print("Sleeping") + for _ in range(4): + print("Please wait...", flush=True) + sleep(1) + print("Finished") + + external_python_task = ExternalPythonOperator( + task_id="external_python", + python_callable=callable_external_python, + python=PATH_TO_PYTHON_BINARY, + ) + # [END howto_operator_external_python] - run_this >> external_python_task >> virtualenv_task + run_this >> external_python_task >> virtualenv_task diff --git a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py index 357a1e7b290a9e..fd9ee3e7b5abd1 100644 --- a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py +++ b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py @@ -21,67 +21,64 @@ from datetime import datetime from airflow.decorators import dag, task -from airflow.providers.standard.operators.python import is_venv_installed log = logging.getLogger(__name__) -if not is_venv_installed(): - log.warning("The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.") -else: - @dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]) - def tutorial_taskflow_api_virtualenv(): +@dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False, tags=["example"]) +def tutorial_taskflow_api_virtualenv(): + """ + ### TaskFlow API example using virtualenv + This is a simple data pipeline example which demonstrates the use of + the TaskFlow API using three simple tasks for Extract, Transform, and Load. + """ + + @task.virtualenv( + serializer="dill", # Use `dill` for advanced serialization. + system_site_packages=False, + requirements=["funcsigs"], + ) + def extract(): + """ + #### Extract task + A simple Extract task to get data ready for the rest of the data + pipeline. In this case, getting data is simulated by reading from a + hardcoded JSON string. + """ + import json + + data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' + + order_data_dict = json.loads(data_string) + return order_data_dict + + @task(multiple_outputs=True) + def transform(order_data_dict: dict): + """ + #### Transform task + A simple Transform task which takes in the collection of order data and + computes the total order value. """ - ### TaskFlow API example using virtualenv - This is a simple data pipeline example which demonstrates the use of - the TaskFlow API using three simple tasks for Extract, Transform, and Load. + total_order_value = 0 + + for value in order_data_dict.values(): + total_order_value += value + + return {"total_order_value": total_order_value} + + @task() + def load(total_order_value: float): + """ + #### Load task + A simple Load task which takes in the result of the Transform task and + instead of saving it to end user review, just prints it out. """ - @task.virtualenv( - serializer="dill", # Use `dill` for advanced serialization. - system_site_packages=False, - requirements=["funcsigs"], - ) - def extract(): - """ - #### Extract task - A simple Extract task to get data ready for the rest of the data - pipeline. In this case, getting data is simulated by reading from a - hardcoded JSON string. - """ - import json - - data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' - - order_data_dict = json.loads(data_string) - return order_data_dict - - @task(multiple_outputs=True) - def transform(order_data_dict: dict): - """ - #### Transform task - A simple Transform task which takes in the collection of order data and - computes the total order value. - """ - total_order_value = 0 - - for value in order_data_dict.values(): - total_order_value += value - - return {"total_order_value": total_order_value} - - @task() - def load(total_order_value: float): - """ - #### Load task - A simple Load task which takes in the result of the Transform task and - instead of saving it to end user review, just prints it out. - """ - - print(f"Total order value is: {total_order_value:.2f}") - - order_data = extract() - order_summary = transform(order_data) - load(order_summary["total_order_value"]) - - tutorial_dag = tutorial_taskflow_api_virtualenv() + print(f"Total order value is: {total_order_value:.2f}") + + order_data = extract() + order_summary = transform(order_data) + load(order_summary["total_order_value"]) + + +tutorial_dag = tutorial_taskflow_api_virtualenv() diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index 8bfbdb630c9f78..b21358db017f4c 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -165,8 +165,7 @@ Those extras are available as regular core airflow extras - they install optiona .. START CORE EXTRAS HERE aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, github-enterprise, google- -auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv, -virtualenv +auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv .. END CORE EXTRAS HERE diff --git a/dev/breeze/doc/images/output_prod-image_build.txt b/dev/breeze/doc/images/output_prod-image_build.txt index 9c6c509a66902a..8610d58e7ac25d 100644 --- a/dev/breeze/doc/images/output_prod-image_build.txt +++ b/dev/breeze/doc/images/output_prod-image_build.txt @@ -1 +1 @@ -fe048412f9fc1527a30eaaf0a986fa16 +3576058438d009eb3dc8be53ea16be8a diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index 43875268071e21..31b44a68a83cb6 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -554,7 +554,6 @@ def get_airflow_extras(): "ssh", "statsd", "uv", - "virtualenv", # END OF EXTRAS LIST UPDATED BY PRE COMMIT ] diff --git a/docker_tests/test_prod_image.py b/docker_tests/test_prod_image.py index a11ba9495db802..95c284dec1fe7c 100644 --- a/docker_tests/test_prod_image.py +++ b/docker_tests/test_prod_image.py @@ -171,7 +171,6 @@ def test_pip_dependencies_conflict(self, default_docker_image): "sftp/ssh": ["paramiko", "sshtunnel"], "slack": ["slack_sdk"], "statsd": ["statsd"], - "virtualenv": ["virtualenv"], } @pytest.mark.skipif(os.environ.get("TEST_SLIM_IMAGE") == "true", reason="Skipped with slim image") diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 92ab5b7ad83928..5d54a94a42217c 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -87,8 +87,6 @@ python dependencies for the provided package. +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | uv | ``pip install 'apache-airflow[uv]'`` | Install uv - fast, Rust-based package installer (experimental) | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ -| virtualenv | ``pip install 'apache-airflow[virtualenv]'`` | Running python tasks in local virtualenv | -+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | cloudpickle | pip install apache-airflow[cloudpickle] | Cloudpickle hooks and operators | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 3d7e89e4fb947c..ed1cd80edc9f47 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1286,8 +1286,7 @@ "standard": { "deps": [ "apache-airflow-providers-common-sql>=1.18.0", - "apache-airflow>=2.8.0", - "virtualenv>=20.26.0" + "apache-airflow>=2.8.0" ], "devel-deps": [], "plugins": [], diff --git a/hatch_build.py b/hatch_build.py index bffe540f28830d..ff091428b25209 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -137,9 +137,6 @@ "uv": [ "uv>=0.1.32", ], - "virtualenv": [ - "virtualenv>=20.26.0", - ], } DOC_EXTRAS: dict[str, list[str]] = { diff --git a/newsfragments/43568.significant.rst b/newsfragments/43568.significant.rst new file mode 100644 index 00000000000000..ebede980234a1c --- /dev/null +++ b/newsfragments/43568.significant.rst @@ -0,0 +1 @@ +Remove ``virtualenv`` extra as PythonVirtualenvOperator has been moved to standard provider and switched to use built-in ven package. diff --git a/providers/src/airflow/providers/standard/CHANGELOG.rst b/providers/src/airflow/providers/standard/CHANGELOG.rst index c359185f0ca2c5..d24449605beff3 100644 --- a/providers/src/airflow/providers/standard/CHANGELOG.rst +++ b/providers/src/airflow/providers/standard/CHANGELOG.rst @@ -42,3 +42,5 @@ Breaking changes ~~~~~~~~~~~~~~~~ * ``In BranchDayOfWeekOperator, DayOfWeekSensor, BranchDateTimeOperator parameter use_task_execution_date has been removed. Please use use_task_logical_date.`` +* ``PythonVirtualenvOperator uses built-in venv instead of virtualenv package.`` +* ``is_venv_installed method has been removed from PythonVirtualenvOperator as venv is built-in.`` diff --git a/providers/src/airflow/providers/standard/operators/python.py b/providers/src/airflow/providers/standard/operators/python.py index 4d908f6ba68092..fb4babaf4aa2a8 100644 --- a/providers/src/airflow/providers/standard/operators/python.py +++ b/providers/src/airflow/providers/standard/operators/python.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import importlib import inspect import json import logging @@ -73,17 +72,6 @@ from airflow.utils.context import Context -def is_venv_installed() -> bool: - """ - Check if the virtualenv package is installed via checking if it is on the path or installed as package. - - :return: True if it is. Whichever way of checking it works, is fine. - """ - if shutil.which("virtualenv") or importlib.util.find_spec("virtualenv"): - return True - return False - - @cache def _parse_version_info(text: str) -> tuple[int, int, int, str, int]: """Parse python version info from a text.""" @@ -710,8 +698,6 @@ def __init__( "Passing non-string types (e.g. int or float) as python_version not supported" ) - if not is_venv_installed(): - raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") if use_airflow_context and (not expect_airflow and not system_site_packages): error_msg = "use_airflow_context is set to True, but expect_airflow and system_site_packages are set to False." raise AirflowException(error_msg) diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index be6467f73c0d5c..ee069165dfe38c 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -30,7 +30,6 @@ versions: dependencies: - apache-airflow>=2.8.0 - apache-airflow-providers-common-sql>=1.18.0 - - virtualenv>=20.26.0 integrations: - integration-name: Standard diff --git a/providers/src/airflow/providers/standard/utils/python_virtualenv.py b/providers/src/airflow/providers/standard/utils/python_virtualenv.py index 00615d70f0f1ce..b9c7be4c2832d7 100644 --- a/providers/src/airflow/providers/standard/utils/python_virtualenv.py +++ b/providers/src/airflow/providers/standard/utils/python_virtualenv.py @@ -29,12 +29,13 @@ from airflow.utils.process_utils import execute_in_subprocess -def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]: - cmd = [sys.executable, "-m", "virtualenv", tmp_dir] +def _generate_venv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]: + """We are using venv command instead of venv module to allow creation of venv for different python versions.""" + if python_bin is None: + python_bin = sys.executable + cmd = [python_bin, "-m", "venv", tmp_dir] if system_site_packages: cmd.append("--system-site-packages") - if python_bin is not None: - cmd.append(f"--python={python_bin}") return cmd @@ -90,8 +91,8 @@ def prepare_virtualenv( if index_urls is not None: _generate_pip_conf(Path(venv_directory) / "pip.conf", index_urls) - virtualenv_cmd = _generate_virtualenv_cmd(venv_directory, python_bin, system_site_packages) - execute_in_subprocess(virtualenv_cmd) + venv_cmd = _generate_venv_cmd(venv_directory, python_bin, system_site_packages) + execute_in_subprocess(venv_cmd) if requirements is not None and requirements_file_path is not None: raise ValueError("Either requirements OR requirements_file_path has to be passed, but not both") diff --git a/providers/tests/standard/operators/test_python.py b/providers/tests/standard/operators/test_python.py index 1fd422d6c4f63b..b8a8ef5bc1228d 100644 --- a/providers/tests/standard/operators/test_python.py +++ b/providers/tests/standard/operators/test_python.py @@ -1129,18 +1129,6 @@ def default_kwargs(*, python_version=DEFAULT_PYTHON_VERSION, **kwargs): kwargs["venv_cache_path"] = venv_cache_path return kwargs - @mock.patch("shutil.which") - @mock.patch("airflow.providers.standard.operators.python.importlib") - def test_virtualenv_not_installed(self, importlib_mock, which_mock): - which_mock.return_value = None - importlib_mock.util.find_spec.return_value = None - - def f(): - pass - - with pytest.raises(AirflowException, match="requires virtualenv"): - self.run_as_task(f) - @CLOUDPICKLE_MARKER def test_add_cloudpickle(self): def f(): diff --git a/providers/tests/standard/utils/test_python_virtualenv.py b/providers/tests/standard/utils/test_python_virtualenv.py index b5f8321d1bda89..da9cf757d74f23 100644 --- a/providers/tests/standard/utils/test_python_virtualenv.py +++ b/providers/tests/standard/utils/test_python_virtualenv.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import sys from pathlib import Path from unittest import mock @@ -66,9 +65,7 @@ def test_should_create_virtualenv(self, mock_execute_in_subprocess): venv_directory="/VENV", python_bin="pythonVER", system_site_packages=False, requirements=[] ) assert "/VENV/bin/python" == python_bin - mock_execute_in_subprocess.assert_called_once_with( - [sys.executable, "-m", "virtualenv", "/VENV", "--python=pythonVER"] - ) + mock_execute_in_subprocess.assert_called_once_with(["pythonVER", "-m", "venv", "/VENV"]) @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") def test_should_create_virtualenv_with_system_packages(self, mock_execute_in_subprocess): @@ -77,7 +74,7 @@ def test_should_create_virtualenv_with_system_packages(self, mock_execute_in_sub ) assert "/VENV/bin/python" == python_bin mock_execute_in_subprocess.assert_called_once_with( - [sys.executable, "-m", "virtualenv", "/VENV", "--system-site-packages", "--python=pythonVER"] + ["pythonVER", "-m", "venv", "/VENV", "--system-site-packages"] ) @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") @@ -93,7 +90,7 @@ def test_pip_install_options(self, mock_execute_in_subprocess): assert "/VENV/bin/python" == python_bin mock_execute_in_subprocess.assert_any_call( - [sys.executable, "-m", "virtualenv", "/VENV", "--system-site-packages", "--python=pythonVER"] + ["pythonVER", "-m", "venv", "/VENV", "--system-site-packages"] ) mock_execute_in_subprocess.assert_called_with( ["/VENV/bin/pip", "install", *pip_install_options, "apache-beam[gcp]"] @@ -109,9 +106,7 @@ def test_should_create_virtualenv_with_extra_packages(self, mock_execute_in_subp ) assert "/VENV/bin/python" == python_bin - mock_execute_in_subprocess.assert_any_call( - [sys.executable, "-m", "virtualenv", "/VENV", "--python=pythonVER"] - ) + mock_execute_in_subprocess.assert_any_call(["pythonVER", "-m", "venv", "/VENV"]) mock_execute_in_subprocess.assert_called_with(["/VENV/bin/pip", "install", "apache-beam[gcp]"]) diff --git a/pyproject.toml b/pyproject.toml index 6e9c3a6f9718b3..58e0a528858ba2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,8 +78,7 @@ dynamic = ["version", "optional-dependencies", "dependencies"] # START CORE EXTRAS HERE # # aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, github-enterprise, google- -# auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv, -# virtualenv +# auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv # # END CORE EXTRAS HERE # diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index 554b33ceb9b77b..00964b00818f3e 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -33,7 +33,7 @@ pytestmark = pytest.mark.db_test DEFAULT_DATE = timezone.datetime(2016, 1, 1) -PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" +PYTHON_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}" DILL_INSTALLED = find_spec("dill") is not None DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, reason="`dill` is not installed") CLOUDPICKLE_INSTALLED = find_spec("cloudpickle") is not None