Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ class TaskDecoratorCollection:
env_vars: dict[str, str] | None = None,
inherit_env: bool = True,
use_dill: bool = False,
use_airflow_context: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand Down Expand Up @@ -177,7 +176,6 @@ class TaskDecoratorCollection:
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
:param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable.
"""
@overload
def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand All @@ -194,7 +192,6 @@ class TaskDecoratorCollection:
env_vars: dict[str, str] | None = None,
inherit_env: bool = True,
use_dill: bool = False,
use_airflow_context: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand Down Expand Up @@ -228,7 +225,6 @@ class TaskDecoratorCollection:
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
:param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable.
"""
@overload
def branch( # type: ignore[misc]
Expand Down Expand Up @@ -262,7 +258,6 @@ class TaskDecoratorCollection:
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
use_airflow_context: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
Expand Down Expand Up @@ -304,7 +299,6 @@ class TaskDecoratorCollection:
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
:param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable.
"""
@overload
def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand Down
92 changes: 0 additions & 92 deletions airflow/example_dags/example_python_context_decorator.py

This file was deleted.

91 changes: 0 additions & 91 deletions airflow/example_dags/example_python_context_operator.py

This file was deleted.

36 changes: 0 additions & 36 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@
from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
from airflow.utils.session import create_session

log = logging.getLogger(__name__)

if TYPE_CHECKING:
from pendulum.datetime import DateTime

from airflow.serialization.enums import Encoding
from airflow.utils.context import Context


Expand Down Expand Up @@ -444,7 +442,6 @@ def __init__(
env_vars: dict[str, str] | None = None,
inherit_env: bool = True,
use_dill: bool = False,
use_airflow_context: bool = False,
**kwargs,
):
if (
Expand Down Expand Up @@ -497,7 +494,6 @@ def __init__(
)
self.env_vars = env_vars
self.inherit_env = inherit_env
self.use_airflow_context = use_airflow_context

@abstractmethod
def _iter_serializable_context_keys(self):
Expand Down Expand Up @@ -544,7 +540,6 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
string_args_path = tmp_dir / "string_args.txt"
script_path = tmp_dir / "script.py"
termination_log_path = tmp_dir / "termination.log"
airflow_context_path = tmp_dir / "airflow_context.json"

self._write_args(input_path)
self._write_string_args(string_args_path)
Expand All @@ -556,7 +551,6 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
"pickling_library": self.serializer,
"python_callable": self.python_callable.__name__,
"python_callable_source": self.get_python_source(),
"use_airflow_context": self.use_airflow_context,
}

if inspect.getfile(self.python_callable) == self.dag.fileloc:
Expand All @@ -567,23 +561,6 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
filename=os.fspath(script_path),
render_template_as_native_obj=self.dag.render_template_as_native_obj,
)
if self.use_airflow_context:
from airflow.serialization.serialized_objects import BaseSerialization

context = get_current_context()
# TODO: `TaskInstance`` will also soon be serialized as expected.
# see more:
# https://github.com/apache/airflow/issues/40974
# https://github.com/apache/airflow/pull/41067
with create_session() as session:
# FIXME: DetachedInstanceError
dag_run, task_instance = context["dag_run"], context["task_instance"]
session.add_all([dag_run, task_instance])
serializable_context: dict[Encoding, Any] = BaseSerialization.serialize(
context, use_pydantic_models=True
)
with airflow_context_path.open("w+") as file:
json.dump(serializable_context, file)

env_vars = dict(os.environ) if self.inherit_env else {}
if self.env_vars:
Expand All @@ -598,7 +575,6 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
],
env=env_vars,
)
Expand Down Expand Up @@ -690,7 +666,6 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
:param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable.
"""

template_fields: Sequence[str] = tuple(
Expand Down Expand Up @@ -719,7 +694,6 @@ def __init__(
env_vars: dict[str, str] | None = None,
inherit_env: bool = True,
use_dill: bool = False,
use_airflow_context: bool = False,
**kwargs,
):
if (
Expand All @@ -741,9 +715,6 @@ def __init__(
)
if not is_venv_installed():
raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
if use_airflow_context and (not expect_airflow and not system_site_packages):
error_msg = "use_airflow_context is set to True, but expect_airflow and system_site_packages are set to False."
raise AirflowException(error_msg)
if not requirements:
self.requirements: list[str] = []
elif isinstance(requirements, str):
Expand Down Expand Up @@ -773,7 +744,6 @@ def __init__(
env_vars=env_vars,
inherit_env=inherit_env,
use_dill=use_dill,
use_airflow_context=use_airflow_context,
**kwargs,
)

Expand Down Expand Up @@ -992,7 +962,6 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
:param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable.
"""

template_fields: Sequence[str] = tuple({"python"}.union(PythonOperator.template_fields))
Expand All @@ -1014,14 +983,10 @@ def __init__(
env_vars: dict[str, str] | None = None,
inherit_env: bool = True,
use_dill: bool = False,
use_airflow_context: bool = False,
**kwargs,
):
if not python:
raise ValueError("Python Path must be defined in ExternalPythonOperator")
if use_airflow_context and not expect_airflow:
error_msg = "use_airflow_context is set to True, but expect_airflow is set to False."
raise AirflowException(error_msg)
self.python = python
self.expect_pendulum = expect_pendulum
super().__init__(
Expand All @@ -1037,7 +1002,6 @@ def __init__(
env_vars=env_vars,
inherit_env=inherit_env,
use_dill=use_dill,
use_airflow_context=use_airflow_context,
**kwargs,
)

Expand Down
Loading