Skip to content

Commit

Permalink
Switch PythonVirtualenvOperator to venv from virtualenv package
Browse files Browse the repository at this point in the history
The PythonVirtualenvOperator has been using virtualenv in order
to support Python 2.7 and pre Python 3.5, however we do not need
those any more - and we can use built-in venv instead.

Fixes: apache#40420
  • Loading branch information
potiuk committed Nov 1, 2024
1 parent 2c67c9f commit cd39453
Show file tree
Hide file tree
Showing 25 changed files with 372 additions and 426 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
# much smaller.
#
# Use the same builder frontend version for everyone
ARG AIRFLOW_EXTRAS="aiobotocore,amazon,async,celery,cncf-kubernetes,common-io,docker,elasticsearch,fab,ftp,google,google-auth,graphviz,grpc,hashicorp,http,ldap,microsoft-azure,mysql,odbc,openlineage,pandas,postgres,redis,sendgrid,sftp,slack,snowflake,ssh,statsd,uv,virtualenv"
ARG AIRFLOW_EXTRAS="aiobotocore,amazon,async,celery,cncf-kubernetes,common-io,docker,elasticsearch,fab,ftp,google,google-auth,graphviz,grpc,hashicorp,http,ldap,microsoft-azure,mysql,odbc,openlineage,pandas,postgres,redis,sendgrid,sftp,slack,snowflake,ssh,statsd,uv"
ARG ADDITIONAL_AIRFLOW_EXTRAS=""
ARG ADDITIONAL_PYTHON_DEPS=""

Expand Down
3 changes: 1 addition & 2 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ Those extras are available as regular core airflow extras - they install optiona
# START CORE EXTRAS HERE

aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, github-enterprise, google-
auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv,
virtualenv
auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, rabbitmq, s3fs, sentry, statsd, uv

# END CORE EXTRAS HERE

Expand Down
217 changes: 107 additions & 110 deletions airflow/example_dags/example_branch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,143 +29,140 @@

import pendulum

from airflow.providers.standard.operators.python import is_venv_installed

if is_venv_installed():
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import (
BranchExternalPythonOperator,
BranchPythonOperator,
BranchPythonVirtualenvOperator,
ExternalPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import (
BranchExternalPythonOperator,
BranchPythonOperator,
BranchPythonVirtualenvOperator,
ExternalPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
)
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

PATH_TO_PYTHON_BINARY = sys.executable

with DAG(
dag_id="example_branch_operator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
tags=["example", "example2"],
orientation="TB",
) as dag:
run_this_first = EmptyOperator(
task_id="run_this_first",
)
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

PATH_TO_PYTHON_BINARY = sys.executable

with DAG(
dag_id="example_branch_operator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
tags=["example", "example2"],
orientation="TB",
) as dag:
run_this_first = EmptyOperator(
task_id="run_this_first",
)

options = ["a", "b", "c", "d"]
options = ["a", "b", "c", "d"]

# Example branching on standard Python tasks

# Example branching on standard Python tasks
# [START howto_operator_branch_python]
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
)
# [END howto_operator_branch_python]
run_this_first >> branching

join = EmptyOperator(
task_id="join",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

# [START howto_operator_branch_python]
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
for option in options:
t = PythonOperator(
task_id=f"branch_{option}",
python_callable=lambda: print("Hello World"),
)
# [END howto_operator_branch_python]
run_this_first >> branching

join = EmptyOperator(
task_id="join",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
empty_follow = EmptyOperator(
task_id="follow_" + option,
)

for option in options:
t = PythonOperator(
task_id=f"branch_{option}",
python_callable=lambda: print("Hello World"),
)
# Label is optional here, but it can help identify more complex branches
branching >> Label(option) >> t >> empty_follow >> join

empty_follow = EmptyOperator(
task_id="follow_" + option,
)
# Example the same with external Python calls

# Label is optional here, but it can help identify more complex branches
branching >> Label(option) >> t >> empty_follow >> join
# [START howto_operator_branch_ext_py]
def branch_with_external_python(choices):
import random

# Example the same with external Python calls
return f"ext_py_{random.choice(choices)}"

# [START howto_operator_branch_ext_py]
def branch_with_external_python(choices):
import random
branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
python=PATH_TO_PYTHON_BINARY,
python_callable=branch_with_external_python,
op_args=[options],
)
# [END howto_operator_branch_ext_py]
join >> branching_ext_py

join_ext_py = EmptyOperator(
task_id="join_ext_python",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

return f"ext_py_{random.choice(choices)}"
def hello_world_with_external_python():
print("Hello World from external Python")

branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
for option in options:
t = ExternalPythonOperator(
task_id=f"ext_py_{option}",
python=PATH_TO_PYTHON_BINARY,
python_callable=branch_with_external_python,
op_args=[options],
python_callable=hello_world_with_external_python,
)
# [END howto_operator_branch_ext_py]
join >> branching_ext_py

join_ext_py = EmptyOperator(
task_id="join_ext_python",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# Label is optional here, but it can help identify more complex branches
branching_ext_py >> Label(option) >> t >> join_ext_py

def hello_world_with_external_python():
print("Hello World from external Python")
# Example the same with Python virtual environments

for option in options:
t = ExternalPythonOperator(
task_id=f"ext_py_{option}",
python=PATH_TO_PYTHON_BINARY,
python_callable=hello_world_with_external_python,
)
# [START howto_operator_branch_virtualenv]
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())

# Label is optional here, but it can help identify more complex branches
branching_ext_py >> Label(option) >> t >> join_ext_py
def branch_with_venv(choices):
import random

# Example the same with Python virtual environments
import numpy as np

# [START howto_operator_branch_virtualenv]
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"

def branch_with_venv(choices):
import random
branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=branch_with_venv,
op_args=[options],
)
# [END howto_operator_branch_virtualenv]
join_ext_py >> branching_venv

join_venv = EmptyOperator(
task_id="join_venv",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

import numpy as np
def hello_world_with_venv():
import numpy as np

print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
print(f"Hello World with some numpy stuff: {np.arange(6)}")

branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
for option in options:
t = PythonVirtualenvOperator(
task_id=f"venv_{option}",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=branch_with_venv,
op_args=[options],
)
# [END howto_operator_branch_virtualenv]
join_ext_py >> branching_venv

join_venv = EmptyOperator(
task_id="join_venv",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
python_callable=hello_world_with_venv,
)

def hello_world_with_venv():
import numpy as np

print(f"Hello World with some numpy stuff: {np.arange(6)}")

for option in options:
t = PythonVirtualenvOperator(
task_id=f"venv_{option}",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=hello_world_with_venv,
)

# Label is optional here, but it can help identify more complex branches
branching_venv >> Label(option) >> t >> join_venv
# Label is optional here, but it can help identify more complex branches
branching_venv >> Label(option) >> t >> join_venv
Loading

0 comments on commit cd39453

Please sign in to comment.