From 26419f1e7289067dedf8be8265a1ebf68b165332 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 21 Jul 2025 16:00:13 +0100 Subject: [PATCH 1/9] Move some items in `util.context` to appropriate places context_manager was only used once in airflow-core, so I redefined it where it was used. Likewise context_copy_partial. Then Context was added to the depreccation for easy import --- airflow-core/src/airflow/models/dag.py | 1 + .../src/airflow/models/taskinstance.py | 23 +++++++++++-- .../serialization/serialized_objects.py | 2 +- airflow-core/src/airflow/utils/context.py | 34 ------------------- .../kubernetes/decorators/kubernetes_cmd.py | 4 +-- .../cncf/kubernetes/version_compat.py | 3 ++ .../providers/standard/decorators/bash.py | 2 +- .../providers/standard/operators/python.py | 17 +++++++--- .../providers/standard/sensors/python.py | 5 ++- .../providers/standard/version_compat.py | 3 ++ task-sdk/src/airflow/sdk/bases/notifier.py | 2 +- .../src/airflow/sdk/definitions/context.py | 19 +++++++++++ 12 files changed, 66 insertions(+), 49 deletions(-) diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index c4f06637c3cfe..6efcaf0b78e52 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -85,6 +85,7 @@ from airflow.models.tasklog import LogTemplate from airflow.sdk import TaskGroup from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey, BaseAsset +from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as task_sdk_dag_decorator from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.settings import json diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5f058a6c9229d..0b93ad080b37a 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -89,6 +89,7 @@ from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComSelectSequence, XComModel +from airflow.sdk.definitions.context import Context from airflow.settings import task_instance_mutation_hook from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext @@ -130,7 +131,6 @@ from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup from airflow.sdk.types import RuntimeTaskInstanceProtocol from airflow.serialization.serialized_objects import SerializedBaseOperator - from airflow.utils.context import Context Operator: TypeAlias = BaseOperator | MappedOperator @@ -367,11 +367,28 @@ def _get_email_subject_content( else: from airflow.sdk.definitions._internal.templater import SandboxedEnvironment - from airflow.utils.context import context_merge if TYPE_CHECKING: assert task_instance.task + def _context_merge(context: Context, *args: Any, **kwargs: Any) -> None: + """ + Merge parameters into an existing context. + + Like ``dict.update()`` , this take the same parameters, and updates + ``context`` in-place. + + This is implemented as a free function because the ``Context`` type is + "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom + functions. + + :meta private: + """ + if not context: + context = Context() + + context.update(*args, **kwargs) + # Use the DAG's get_template_env() to set force_sandboxed. Don't add # the flag to the function on task object -- that function can be # overridden, and adding a flag breaks backward compatibility. @@ -381,7 +398,7 @@ def _get_email_subject_content( else: jinja_env = SandboxedEnvironment(cache_size=0) jinja_context = task_instance.get_template_context() - context_merge(jinja_context, additional_context) + _context_merge(jinja_context, additional_context) def render(key: str, content: str) -> str: if conf.has_option("email", key): diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index cadd701248e73..a787e6a48e529 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -63,6 +63,7 @@ AssetUniqueKey, BaseAsset, ) +from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.deadline import DeadlineAlert from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.param import Param, ParamsDict @@ -88,7 +89,6 @@ from airflow.utils.code_utils import get_python_source from airflow.utils.context import ( ConnectionAccessor, - Context, VariableAccessor, ) from airflow.utils.db import LazySelectSequence diff --git a/airflow-core/src/airflow/utils/context.py b/airflow-core/src/airflow/utils/context.py index c27032c7c3c20..776f8d1402077 100644 --- a/airflow-core/src/airflow/utils/context.py +++ b/airflow-core/src/airflow/utils/context.py @@ -19,13 +19,9 @@ from __future__ import annotations -from collections.abc import ( - Container, -) from typing import ( TYPE_CHECKING, Any, - cast, ) from sqlalchemy import select @@ -33,7 +29,6 @@ from airflow.models.asset import ( AssetModel, ) -from airflow.sdk.definitions.context import Context from airflow.sdk.execution_time.context import ( ConnectionAccessor as ConnectionAccessorSDK, OutletEventAccessors as OutletEventAccessorsSDK, @@ -146,32 +141,3 @@ def _get_asset_from_db(name: str | None = None, uri: str | None = None) -> Asset raise ValueError("Either name or uri must be provided") return asset.to_public() - - -def context_merge(context: Context, *args: Any, **kwargs: Any) -> None: - """ - Merge parameters into an existing context. - - Like ``dict.update()`` , this take the same parameters, and updates - ``context`` in-place. - - This is implemented as a free function because the ``Context`` type is - "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom - functions. - - :meta private: - """ - if not context: - context = Context() - - context.update(*args, **kwargs) - - -def context_copy_partial(source: Context, keys: Container[str]) -> Context: - """ - Create a context by copying items under selected keys in ``source``. - - :meta private: - """ - new = {k: v for k, v in source.items() if k in keys} - return cast("Context", new) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index 91d541ddf8e88..87d0cf8e8eacf 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -24,13 +24,13 @@ from airflow.providers.cncf.kubernetes.version_compat import ( DecoratedOperator, TaskDecorator, + context_merge, task_decorator_factory, ) -from airflow.utils.context import context_merge from airflow.utils.operator_helpers import determine_kwargs if TYPE_CHECKING: - from airflow.utils.context import Context + from airflow.utils.context import Context # type: ignore[attr-defined, no-redef] class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py index e14332d52f29b..da0c09bab59e4 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py @@ -38,9 +38,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: if AIRFLOW_V_3_1_PLUS: from airflow.models.xcom import XCOM_RETURN_KEY from airflow.sdk import BaseHook, BaseOperator + from airflow.sdk.definitions.context import context_merge else: from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef] from airflow.models import BaseOperator + from airflow.utils.context import context_merge # type: ignore[attr-defined, no-redef] from airflow.utils.xcom import XCOM_RETURN_KEY # type: ignore[no-redef] if AIRFLOW_V_3_0_PLUS: @@ -64,4 +66,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: "TaskDecorator", "task_decorator_factory", "XCOM_RETURN_KEY", + "context_merge", ] diff --git a/providers/standard/src/airflow/providers/standard/decorators/bash.py b/providers/standard/src/airflow/providers/standard/decorators/bash.py index 59dcfe33be810..71962b0da13be 100644 --- a/providers/standard/src/airflow/providers/standard/decorators/bash.py +++ b/providers/standard/src/airflow/providers/standard/decorators/bash.py @@ -33,8 +33,8 @@ ) from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.version_compat import context_merge from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION -from airflow.utils.context import context_merge from airflow.utils.operator_helpers import determine_kwargs if TYPE_CHECKING: diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 53c95886ae176..8449c45b4109c 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -51,9 +51,8 @@ from airflow.models.variable import Variable from airflow.providers.standard.hooks.package_index import PackageIndexHook from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator, context_merge from airflow.utils import hashlib_wrapper -from airflow.utils.context import context_copy_partial, context_merge from airflow.utils.file import get_unique_dag_module_name from airflow.utils.operator_helpers import KeywordParameters from airflow.utils.process_utils import execute_in_subprocess @@ -79,7 +78,7 @@ try: from airflow.sdk.definitions.context import Context except ImportError: # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.utils.context import Context # type: ignore[attr-defined, no-redef] _SerializerTypeDef = Literal["pickle", "cloudpickle", "dill"] @@ -485,9 +484,19 @@ def __init__( def _iter_serializable_context_keys(self): pass + @staticmethod + def context_copy_partial(source: Context, keys: Container[str]) -> Context: + """ + Create a context by copying items under selected keys in ``source``. + + :meta private: + """ + new = {k: v for k, v in source.items() if k in keys} + return cast("Context", new) + def execute(self, context: Context) -> Any: serializable_keys = set(self._iter_serializable_context_keys()) - serializable_context = context_copy_partial(context, serializable_keys) + serializable_context = self.context_copy_partial(context, serializable_keys) return super().execute(context=serializable_context) def get_python_source(self): diff --git a/providers/standard/src/airflow/providers/standard/sensors/python.py b/providers/standard/src/airflow/providers/standard/sensors/python.py index d6fc454018cc2..a8bcce7017ca3 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/python.py +++ b/providers/standard/src/airflow/providers/standard/sensors/python.py @@ -20,8 +20,7 @@ from collections.abc import Callable, Mapping, Sequence from typing import TYPE_CHECKING, Any -from airflow.providers.standard.version_compat import BaseSensorOperator, PokeReturnValue -from airflow.utils.context import context_merge +from airflow.providers.standard.version_compat import BaseSensorOperator, PokeReturnValue, context_merge from airflow.utils.operator_helpers import determine_kwargs if TYPE_CHECKING: @@ -29,7 +28,7 @@ from airflow.sdk.definitions.context import Context except ImportError: # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.utils.context import Context # type: ignore[no-redef, attr-defined] class PythonSensor(BaseSensorOperator): diff --git a/providers/standard/src/airflow/providers/standard/version_compat.py b/providers/standard/src/airflow/providers/standard/version_compat.py index a2b96fe6dec0d..ac3864f9446f3 100644 --- a/providers/standard/src/airflow/providers/standard/version_compat.py +++ b/providers/standard/src/airflow/providers/standard/version_compat.py @@ -40,9 +40,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: # even though it wasn't used. if AIRFLOW_V_3_1_PLUS: from airflow.sdk import BaseHook, BaseOperator + from airflow.sdk.definitions.context import context_merge else: from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef] from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] + from airflow.utils.context import context_merge # type: ignore[no-redef, attr-defined] if AIRFLOW_V_3_0_PLUS: from airflow.sdk import BaseOperatorLink @@ -59,4 +61,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: "BaseHook", "BaseSensorOperator", "PokeReturnValue", + "context_merge", ] diff --git a/task-sdk/src/airflow/sdk/bases/notifier.py b/task-sdk/src/airflow/sdk/bases/notifier.py index 457abc7f1407c..df4023d043a23 100644 --- a/task-sdk/src/airflow/sdk/bases/notifier.py +++ b/task-sdk/src/airflow/sdk/bases/notifier.py @@ -22,7 +22,7 @@ from typing import TYPE_CHECKING from airflow.sdk.definitions._internal.templater import Templater -from airflow.utils.context import context_merge +from airflow.sdk.definitions.context import context_merge from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: diff --git a/task-sdk/src/airflow/sdk/definitions/context.py b/task-sdk/src/airflow/sdk/definitions/context.py index 1f3dc67666b14..2bb4f0013c4c8 100644 --- a/task-sdk/src/airflow/sdk/definitions/context.py +++ b/task-sdk/src/airflow/sdk/definitions/context.py @@ -83,6 +83,25 @@ class Context(TypedDict, total=False): var: Any +def context_merge(context: Context, *args: Any, **kwargs: Any) -> None: + """ + Merge parameters into an existing context. + + Like ``dict.update()`` , this take the same parameters, and updates + ``context`` in-place. + + This is implemented as a free function because the ``Context`` type is + "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom + functions. + + :meta private: + """ + if not context: + context = Context() + + context.update(*args, **kwargs) + + def get_current_context() -> Context: """ Retrieve the execution context dictionary without altering user method's signature. From 19314fd86270190a7148a83f1aa6000e5c06763b Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 29 Jul 2025 14:00:20 +0100 Subject: [PATCH 2/9] Only remove core dependency in sdk --- airflow-core/src/airflow/models/dag.py | 1 - .../src/airflow/models/taskinstance.py | 2 +- airflow-core/src/airflow/utils/context.py | 34 +++++++++++++++++++ task-sdk/src/airflow/sdk/bases/decorator.py | 3 +- .../src/airflow/sdk/definitions/context.py | 3 ++ .../airflow/sdk/definitions/mappedoperator.py | 2 +- 6 files changed, 41 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 6efcaf0b78e52..c4f06637c3cfe 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -85,7 +85,6 @@ from airflow.models.tasklog import LogTemplate from airflow.sdk import TaskGroup from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey, BaseAsset -from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as task_sdk_dag_decorator from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.settings import json diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 0b93ad080b37a..cdd1c8f517adc 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -89,11 +89,11 @@ from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComSelectSequence, XComModel -from airflow.sdk.definitions.context import Context from airflow.settings import task_instance_mutation_hook from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS +from airflow.utils.context import Context from airflow.utils.email import send_email from airflow.utils.helpers import prune_dict, render_template_to_string from airflow.utils.log.logging_mixin import LoggingMixin diff --git a/airflow-core/src/airflow/utils/context.py b/airflow-core/src/airflow/utils/context.py index 776f8d1402077..c27032c7c3c20 100644 --- a/airflow-core/src/airflow/utils/context.py +++ b/airflow-core/src/airflow/utils/context.py @@ -19,9 +19,13 @@ from __future__ import annotations +from collections.abc import ( + Container, +) from typing import ( TYPE_CHECKING, Any, + cast, ) from sqlalchemy import select @@ -29,6 +33,7 @@ from airflow.models.asset import ( AssetModel, ) +from airflow.sdk.definitions.context import Context from airflow.sdk.execution_time.context import ( ConnectionAccessor as ConnectionAccessorSDK, OutletEventAccessors as OutletEventAccessorsSDK, @@ -141,3 +146,32 @@ def _get_asset_from_db(name: str | None = None, uri: str | None = None) -> Asset raise ValueError("Either name or uri must be provided") return asset.to_public() + + +def context_merge(context: Context, *args: Any, **kwargs: Any) -> None: + """ + Merge parameters into an existing context. + + Like ``dict.update()`` , this take the same parameters, and updates + ``context`` in-place. + + This is implemented as a free function because the ``Context`` type is + "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom + functions. + + :meta private: + """ + if not context: + context = Context() + + context.update(*args, **kwargs) + + +def context_copy_partial(source: Context, keys: Container[str]) -> Context: + """ + Create a context by copying items under selected keys in ``source``. + + :meta private: + """ + new = {k: v for k, v in source.items() if k in keys} + return cast("Context", new) diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py b/task-sdk/src/airflow/sdk/bases/decorator.py index f143acee5fe08..ed350dce905d2 100644 --- a/task-sdk/src/airflow/sdk/bases/decorator.py +++ b/task-sdk/src/airflow/sdk/bases/decorator.py @@ -46,13 +46,14 @@ ) from airflow.sdk.definitions._internal.types import NOTSET from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.context import KNOWN_CONTEXT_KEYS from airflow.sdk.definitions.mappedoperator import ( MappedOperator, ensure_xcomarg_return_value, prevent_duplicates, ) from airflow.sdk.definitions.xcom_arg import XComArg -from airflow.utils.context import KNOWN_CONTEXT_KEYS +from airflow.utils.helpers import prevent_duplicates from airflow.utils.trigger_rule import TriggerRule if TYPE_CHECKING: diff --git a/task-sdk/src/airflow/sdk/definitions/context.py b/task-sdk/src/airflow/sdk/definitions/context.py index 2bb4f0013c4c8..10d7facfa9179 100644 --- a/task-sdk/src/airflow/sdk/definitions/context.py +++ b/task-sdk/src/airflow/sdk/definitions/context.py @@ -83,6 +83,9 @@ class Context(TypedDict, total=False): var: Any +KNOWN_CONTEXT_KEYS: set[str] = set(Context.__annotations__.keys()) + + def context_merge(context: Context, *args: Any, **kwargs: Any) -> None: """ Merge parameters into an existing context. diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index 7fa35380920f9..551f17865e8c0 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -67,12 +67,12 @@ from airflow.sdk.bases.operator import BaseOperator from airflow.sdk.bases.operatorlink import BaseOperatorLink from airflow.sdk.definitions._internal.expandinput import ExpandInput + from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.param import ParamsDict from airflow.sdk.definitions.taskgroup import TaskGroup from airflow.sdk.definitions.xcom_arg import XComArg from airflow.triggers.base import StartTriggerArgs - from airflow.utils.context import Context from airflow.utils.operator_resources import Resources from airflow.utils.trigger_rule import TriggerRule From 82eb9e21e6be3f49a12afbd847a013464744dc96 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 29 Jul 2025 15:07:37 +0100 Subject: [PATCH 3/9] fixup! Only remove core dependency in sdk --- airflow-core/src/airflow/serialization/serialized_objects.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index a787e6a48e529..cadd701248e73 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -63,7 +63,6 @@ AssetUniqueKey, BaseAsset, ) -from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.deadline import DeadlineAlert from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.param import Param, ParamsDict @@ -89,6 +88,7 @@ from airflow.utils.code_utils import get_python_source from airflow.utils.context import ( ConnectionAccessor, + Context, VariableAccessor, ) from airflow.utils.db import LazySelectSequence From 8b941a0a028ec8a102e8e61baa77bf1fe03f7e74 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 29 Jul 2025 15:15:24 +0100 Subject: [PATCH 4/9] Use class obj instead of self --- .../standard/src/airflow/providers/standard/operators/python.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 8449c45b4109c..1c0f43bb3e536 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -496,7 +496,7 @@ def context_copy_partial(source: Context, keys: Container[str]) -> Context: def execute(self, context: Context) -> Any: serializable_keys = set(self._iter_serializable_context_keys()) - serializable_context = self.context_copy_partial(context, serializable_keys) + serializable_context = _BasePythonVirtualenvOperator.context_copy_partial(context, serializable_keys) return super().execute(context=serializable_context) def get_python_source(self): From 0658c69f8a4ed59490f3d3874e092989930e643d Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 29 Jul 2025 16:06:07 +0100 Subject: [PATCH 5/9] fixup! Use class obj instead of self --- airflow-core/src/airflow/models/taskinstance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index cdd1c8f517adc..4d195cf1e0c71 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -93,7 +93,6 @@ from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS -from airflow.utils.context import Context from airflow.utils.email import send_email from airflow.utils.helpers import prune_dict, render_template_to_string from airflow.utils.log.logging_mixin import LoggingMixin @@ -131,6 +130,7 @@ from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup from airflow.sdk.types import RuntimeTaskInstanceProtocol from airflow.serialization.serialized_objects import SerializedBaseOperator + from airflow.utils.context import Context Operator: TypeAlias = BaseOperator | MappedOperator @@ -367,6 +367,7 @@ def _get_email_subject_content( else: from airflow.sdk.definitions._internal.templater import SandboxedEnvironment + from airflow.utils.context import Context if TYPE_CHECKING: assert task_instance.task From a1e2a03f706e16c338be4ca60e1b3c0346bc4b83 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 31 Jul 2025 16:32:08 +0100 Subject: [PATCH 6/9] fixup! fixup! Use class obj instead of self --- .../src/airflow/models/taskinstance.py | 23 ++++--------------- .../kubernetes/decorators/kubernetes_cmd.py | 2 +- .../providers/standard/operators/python.py | 9 +++----- 3 files changed, 8 insertions(+), 26 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 4d195cf1e0c71..9da8285b00f13 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -372,24 +372,6 @@ def _get_email_subject_content( if TYPE_CHECKING: assert task_instance.task - def _context_merge(context: Context, *args: Any, **kwargs: Any) -> None: - """ - Merge parameters into an existing context. - - Like ``dict.update()`` , this take the same parameters, and updates - ``context`` in-place. - - This is implemented as a free function because the ``Context`` type is - "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom - functions. - - :meta private: - """ - if not context: - context = Context() - - context.update(*args, **kwargs) - # Use the DAG's get_template_env() to set force_sandboxed. Don't add # the flag to the function on task object -- that function can be # overridden, and adding a flag breaks backward compatibility. @@ -399,7 +381,10 @@ def _context_merge(context: Context, *args: Any, **kwargs: Any) -> None: else: jinja_env = SandboxedEnvironment(cache_size=0) jinja_context = task_instance.get_template_context() - _context_merge(jinja_context, additional_context) + if not jinja_context: + jinja_context = Context() + # Add additional fields to the context for email template rendering + jinja_context.update(additional_context) # type: ignore[typeddict-item] def render(key: str, content: str) -> str: if conf.has_option("email", key): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py index 87d0cf8e8eacf..483fb465e071a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py @@ -30,7 +30,7 @@ from airflow.utils.operator_helpers import determine_kwargs if TYPE_CHECKING: - from airflow.utils.context import Context # type: ignore[attr-defined, no-redef] + from airflow.utils.context import Context class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator): diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 1c0f43bb3e536..0fe3d479d3770 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -74,11 +74,7 @@ from airflow.sdk.execution_time.callback_runner import ExecutionCallableRunner from airflow.sdk.execution_time.context import OutletEventAccessorsProtocol - - try: - from airflow.sdk.definitions.context import Context - except ImportError: # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context # type: ignore[attr-defined, no-redef] + from airflow.utils.context import Context _SerializerTypeDef = Literal["pickle", "cloudpickle", "dill"] @@ -496,7 +492,8 @@ def context_copy_partial(source: Context, keys: Container[str]) -> Context: def execute(self, context: Context) -> Any: serializable_keys = set(self._iter_serializable_context_keys()) - serializable_context = _BasePythonVirtualenvOperator.context_copy_partial(context, serializable_keys) + new = {k: v for k, v in context.items() if k in serializable_keys} + serializable_context = cast("Context", new) return super().execute(context=serializable_context) def get_python_source(self): From 8542747bf06a948f90e450e49798cbbb7d72841a Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 31 Jul 2025 16:43:45 +0100 Subject: [PATCH 7/9] fixup! fixup! fixup! Use class obj instead of self --- .../src/airflow/providers/standard/operators/python.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 0fe3d479d3770..d127816f36c8b 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -480,16 +480,6 @@ def __init__( def _iter_serializable_context_keys(self): pass - @staticmethod - def context_copy_partial(source: Context, keys: Container[str]) -> Context: - """ - Create a context by copying items under selected keys in ``source``. - - :meta private: - """ - new = {k: v for k, v in source.items() if k in keys} - return cast("Context", new) - def execute(self, context: Context) -> Any: serializable_keys = set(self._iter_serializable_context_keys()) new = {k: v for k, v in context.items() if k in serializable_keys} From dbfc84635e9cbbffd4984976b879e49e9eacab3e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 11 Aug 2025 13:09:41 +0100 Subject: [PATCH 8/9] fixup! fixup! fixup! fixup! Use class obj instead of self --- .../src/airflow/providers/standard/operators/python.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index d127816f36c8b..08aa4905770aa 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -74,7 +74,11 @@ from airflow.sdk.execution_time.callback_runner import ExecutionCallableRunner from airflow.sdk.execution_time.context import OutletEventAccessorsProtocol - from airflow.utils.context import Context + + try: + from airflow.sdk.definitions.context import Context + except ImportError: # TODO: Remove once provider drops support for Airflow 2 + from airflow.utils.context import Context _SerializerTypeDef = Literal["pickle", "cloudpickle", "dill"] From 65faa2a675d128b5adbde7861249556de64fef41 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 11 Aug 2025 13:16:17 +0100 Subject: [PATCH 9/9] fixup! fixup! fixup! fixup! fixup! Use class obj instead of self --- airflow-core/src/airflow/models/taskinstance.py | 2 +- task-sdk/src/airflow/sdk/bases/decorator.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 9da8285b00f13..6dcb7f9b08dc2 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -367,7 +367,7 @@ def _get_email_subject_content( else: from airflow.sdk.definitions._internal.templater import SandboxedEnvironment - from airflow.utils.context import Context + from airflow.sdk.definitions.context import Context if TYPE_CHECKING: assert task_instance.task diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py b/task-sdk/src/airflow/sdk/bases/decorator.py index ed350dce905d2..6c56a641014d9 100644 --- a/task-sdk/src/airflow/sdk/bases/decorator.py +++ b/task-sdk/src/airflow/sdk/bases/decorator.py @@ -53,7 +53,6 @@ prevent_duplicates, ) from airflow.sdk.definitions.xcom_arg import XComArg -from airflow.utils.helpers import prevent_duplicates from airflow.utils.trigger_rule import TriggerRule if TYPE_CHECKING: