diff --git a/airflow-core/docs/conf.py b/airflow-core/docs/conf.py index 402d7db723089..545c95e712168 100644 --- a/airflow-core/docs/conf.py +++ b/airflow-core/docs/conf.py @@ -120,7 +120,6 @@ PACKAGES_THAT_WE_SHOULD_ADD_TO_API_DOCS = { "hooks", - "decorators", "example_dags", "executors", "operators", @@ -140,15 +139,6 @@ MODELS_THAT_SHOULD_BE_INCLUDED_IN_API_DOCS: set[str] = { "baseoperator.py", - "connection.py", - "dag.py", - "dagrun.py", - "dagbag.py", - "param.py", - "taskinstance.py", - "taskinstancekey.py", - "variable.py", - "xcom.py", } diff --git a/airflow-core/docs/core-concepts/params.rst b/airflow-core/docs/core-concepts/params.rst index 805da212d352a..f6d8a2c5c7a89 100644 --- a/airflow-core/docs/core-concepts/params.rst +++ b/airflow-core/docs/core-concepts/params.rst @@ -32,7 +32,7 @@ If the user-supplied values don't pass validation, Airflow shows a warning inste DAG-level Params ---------------- -To add Params to a :class:`~airflow.models.dag.DAG`, initialize it with the ``params`` kwarg. +To add Params to a :class:`~airflow.sdk.DAG`, initialize it with the ``params`` kwarg. Use a dictionary that maps Param names to either a :class:`~airflow.sdk.definitions.param.Param` or an object indicating the parameter's default value. .. code-block:: diff --git a/airflow-core/docs/core-concepts/variables.rst b/airflow-core/docs/core-concepts/variables.rst index db0ffacb0884f..6487fd0c131a3 100644 --- a/airflow-core/docs/core-concepts/variables.rst +++ b/airflow-core/docs/core-concepts/variables.rst @@ -33,6 +33,20 @@ To use them, just import and call ``get`` on the Variable model:: # Returns the value of default (None) if the variable is not set baz = Variable.get("baz", default=None) +You can also access variables through the Task Context using +:func:`~airflow.sdk.get_current_context`: + +.. code-block:: python + + from airflow.sdk import get_current_context + + + def my_task(): + context = get_current_context() + var = context["var"] + my_variable = var.get("my_variable_name") + return my_variable + You can also use them from :ref:`templates `:: # Raw value diff --git a/airflow-core/docs/core-concepts/xcoms.rst b/airflow-core/docs/core-concepts/xcoms.rst index 2be9b75bbf849..93463a752768e 100644 --- a/airflow-core/docs/core-concepts/xcoms.rst +++ b/airflow-core/docs/core-concepts/xcoms.rst @@ -25,6 +25,9 @@ XComs (short for "cross-communications") are a mechanism that let :doc:`tasks` t An XCom is identified by a ``key`` (essentially its name), as well as the ``task_id`` and ``dag_id`` it came from. They can have any serializable value (including objects that are decorated with ``@dataclass`` or ``@attr.define``, see :ref:`TaskFlow arguments `:), but they are only designed for small amounts of data; do not use them to pass around large values, like dataframes. +XCom operations should be performed through the Task Context using +:func:`~airflow.sdk.get_current_context`. Directly updating using XCom database model is not possible. + XComs are explicitly "pushed" and "pulled" to/from their storage using the ``xcom_push`` and ``xcom_pull`` methods on Task Instances. To push a value within a task called **"task-1"** that will be used by another task: @@ -73,8 +76,6 @@ An example of pushing multiple XComs and pulling them individually: # Pulling entire xcom data from push_multiple task data = context["ti"].xcom_pull(task_ids="push_multiple", key="return_value") - - .. note:: If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent. @@ -91,7 +92,7 @@ Custom XCom Backends The XCom system has interchangeable backends, and you can set which backend is being used via the ``xcom_backend`` configuration option. -If you want to implement your own backend, you should subclass :class:`~airflow.models.xcom.BaseXCom`, and override the ``serialize_value`` and ``deserialize_value`` methods. +If you want to implement your own backend, you should subclass :class:`~airflow.sdk.bases.xcom.BaseXCom`, and override the ``serialize_value`` and ``deserialize_value`` methods. You can override the ``purge`` method in the ``BaseXCom`` class to have control over purging the xcom data from the custom backend. This will be called as part of ``delete``. @@ -104,6 +105,6 @@ If you can exec into a terminal in an Airflow container, you can then print out .. code-block:: python - from airflow.models.xcom import XCom + from airflow.sdk.execution_time.xcom import XCom print(XCom.__name__) diff --git a/airflow-core/docs/howto/connection.rst b/airflow-core/docs/howto/connection.rst index 84aa1648b8224..e58d0260db49b 100644 --- a/airflow-core/docs/howto/connection.rst +++ b/airflow-core/docs/howto/connection.rst @@ -22,7 +22,7 @@ Managing Connections For an overview of hooks and connections, see :doc:`/authoring-and-scheduling/connections`. -Airflow's :class:`~airflow.models.connection.Connection` object is used for storing credentials and other information necessary for connecting to external services. +Airflow's :class:`~airflow.sdk.Connection` object is used for storing credentials and other information necessary for connecting to external services. Connections may be defined in the following ways: @@ -77,7 +77,7 @@ convenience property :py:meth:`~airflow.models.connection.Connection.as_json`. I .. code-block:: pycon - >>> from airflow.models.connection import Connection + >>> from airflow.sdk import Connection >>> c = Connection( ... conn_id="some_conn", ... conn_type="mysql", @@ -94,7 +94,7 @@ In addition, same approach could be used to convert Connection from URI format t .. code-block:: pycon - >>> from airflow.models.connection import Connection + >>> from airflow.sdk import Connection >>> c = Connection( ... conn_id="awesome_conn", ... description="Example Connection", diff --git a/airflow-core/docs/howto/custom-operator.rst b/airflow-core/docs/howto/custom-operator.rst index b76a2277fbfea..d6206166e1211 100644 --- a/airflow-core/docs/howto/custom-operator.rst +++ b/airflow-core/docs/howto/custom-operator.rst @@ -24,7 +24,7 @@ Creating a custom Operator Airflow allows you to create new operators to suit the requirements of you or your team. This extensibility is one of the many features which make Apache Airflow powerful. -You can create any operator you want by extending the :class:`airflow.models.baseoperator.BaseOperator` +You can create any operator you want by extending the public SDK base class :class:`~airflow.sdk.BaseOperator`. There are two methods that you need to override in a derived class: diff --git a/airflow-core/docs/public-airflow-interface.rst b/airflow-core/docs/public-airflow-interface.rst index aa5e3b5dc1bee..b0b7cfe5af23a 100644 --- a/airflow-core/docs/public-airflow-interface.rst +++ b/airflow-core/docs/public-airflow-interface.rst @@ -15,6 +15,17 @@ specific language governing permissions and limitations under the License. +**PUBLIC INTERFACE FOR AIRFLOW 3.0+** +===================================== + +.. warning:: + + **This documentation covers the Public Interface for Airflow 3.0+** + + If you are using Airflow 2.x, please refer to the + `Airflow 2.11 Public Interface Documentation `_ + for the legacy interface. + Public Interface of Airflow ........................... @@ -25,6 +36,14 @@ and extending Airflow capabilities by writing new executors, plugins, operators Public Interface can be useful for building custom tools and integrations with other systems, and for automating certain aspects of the Airflow workflow. +The primary public interface for DAG Authors and task execution is using task SDK +Airflow task SDK is the primary public interface for DAG Authors and for task execution +:doc:`airflow.sdk namespace `. Direct access to the metadata database +from task code is no longer allowed. Instead, use the :doc:`Stable REST API `, +`Python Client `_, or Task Context methods. + +For comprehensive Task SDK documentation, see the `Task SDK Reference `_. + Using Airflow Public Interfaces =============================== @@ -56,13 +75,65 @@ way, the Stable REST API is recommended. Using the Public Interface for DAG Authors ========================================== +The primary interface for DAG Authors is the :doc:`airflow.sdk namespace `. +This provides a stable, well-defined interface for creating DAGs and tasks that is not subject to internal +implementation changes. The goal of this change is to decouple DAG authoring from Airflow internals (Scheduler, +API Server, etc.), providing a version-agnostic, stable interface for writing and maintaining DAGs across Airflow versions. + +**Key Imports from airflow.sdk:** + +**Classes:** + +* ``Asset`` +* ``BaseHook`` +* ``BaseNotifier`` +* ``BaseOperator`` +* ``BaseOperatorLink`` +* ``BaseSensorOperator`` +* ``Connection`` +* ``Context`` +* ``DAG`` +* ``EdgeModifier`` +* ``Label`` +* ``ObjectStoragePath`` +* ``Param`` +* ``TaskGroup`` +* ``Variable`` + +**Decorators and Functions:** + +* ``@asset`` +* ``@dag`` +* ``@setup`` +* ``@task`` +* ``@task_group`` +* ``@teardown`` +* ``chain`` +* ``chain_linear`` +* ``cross_downstream`` +* ``get_current_context`` +* ``get_parsing_context`` + +**Migration from Airflow 2.x:** + +For detailed migration instructions from Airflow 2.x to 3.x, including import changes and other breaking changes, +see the :doc:`Migration Guide `. + +For an exhaustive list of available classes, decorators, and functions, check ``airflow.sdk.__all__``. + +All DAGs should update imports to use ``airflow.sdk`` instead of referencing internal Airflow modules directly. +Legacy import paths (e.g., ``airflow.models.dag.DAG``, ``airflow.decorator.task``) are deprecated and will be +removed in a future Airflow version. + Dags ----- +==== The DAG is Airflow's core entity that represents a recurring workflow. You can create a DAG by -instantiating the :class:`~airflow.models.dag.DAG` class in your DAG file. You can also instantiate -them via :class:`~airflow.models.dagbag.DagBag` class that reads dags from a file or a folder. Dags -can also have parameters specified via :class:`~airflow.sdk.definitions.param.Param` class. +instantiating the :class:`~airflow.sdk.DAG` class in your DAG file. Dags can also have parameters +specified via :class:`~airflow.sdk.Param` class. + +The recommended way to create DAGs is using the :func:`~airflow.sdk.dag` decorator +from the airflow.sdk namespace. Airflow has a set of example dags that you can use to learn how to write dags @@ -77,69 +148,86 @@ You can read more about dags in :doc:`Dags `. References for the modules used in dags are here: -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 +.. note:: + The airflow.sdk namespace provides the primary interface for DAG Authors. + For detailed API documentation, see the `Task SDK Reference `_. - _api/airflow/models/dag/index - _api/airflow/models/dagbag/index - _api/airflow/models/param/index +.. note:: + The :class:`~airflow.models.dagbag.DagBag` class is used internally by Airflow for loading DAGs + from files and folders. DAG Authors should use the :class:`~airflow.sdk.DAG` class from the + airflow.sdk namespace instead. -Properties of a :class:`~airflow.models.dagrun.DagRun` can also be referenced in things like :ref:`Templates `. - -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/dagrun/index +.. note:: + The :class:`~airflow.models.dagrun.DagRun` class is used internally by Airflow for DAG run + management. DAG Authors should access DAG run information through the Task Context via + :func:`~airflow.sdk.get_current_context` or use the :class:`~airflow.sdk.types.DagRunProtocol` + interface. .. _pythonapi:operators: Operators ---------- +========= + +The base classes :class:`~airflow.sdk.BaseOperator` and :class:`~airflow.sdk.BaseSensorOperator` are public and may be extended to make new operators. -The base classes :class:`~airflow.models.baseoperator.BaseOperator` and :class:`~airflow.sensors.base.BaseSensorOperator` are public and may be extended to make new operators. +The base class for new operators is :class:`~airflow.sdk.BaseOperator` +from the airflow.sdk namespace. Subclasses of BaseOperator which are published in Apache Airflow are public in *behavior* but not in *structure*. That is to say, the Operator's parameters and behavior is governed by semver but the methods are subject to change at any time. Task Instances --------------- +============== -Task instances are the individual runs of a single task in a DAG (in a DAG Run). They are available in the context -passed to the execute method of the operators via the :class:`~airflow.models.taskinstance.TaskInstance` class. - -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/taskinstance/index +Task instances are the individual runs of a single task in a DAG (in a DAG Run). Task instances are accessed through +the Task Context via :func:`~airflow.sdk.get_current_context`. Direct database access is not possible. +.. note:: + Task Context is part of the airflow.sdk namespace. + For detailed API documentation, see the `Task SDK Reference `_. Task Instance Keys ------------------- +================== Task instance keys are unique identifiers of task instances in a DAG (in a DAG Run). A key is a tuple that consists of -``dag_id``, ``task_id``, ``run_id``, ``try_number``, and ``map_index``. The key of a task instance can be retrieved via -:meth:`~airflow.models.taskinstance.TaskInstance.key`. +``dag_id``, ``task_id``, ``run_id``, ``try_number``, and ``map_index``. -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 +Direct access to task instance keys via the :class:`~airflow.models.taskinstance.TaskInstance` +model is no longer allowed from task code. Instead, use the Task Context via :func:`~airflow.sdk.get_current_context` +to access task instance information. + +Example of accessing task instance information through Task Context: + +.. code-block:: python + + from airflow.sdk import get_current_context + + + def my_task(): + context = get_current_context() + ti = context["ti"] + + dag_id = ti.dag_id + task_id = ti.task_id + run_id = ti.run_id + try_number = ti.try_number + map_index = ti.map_index + + print(f"Task: {dag_id}.{task_id}, Run: {run_id}, Try: {try_number}, Map Index: {map_index}") + +.. note:: + The :class:`~airflow.models.taskinstancekey.TaskInstanceKey` class is used internally by Airflow + for identifying task instances. DAG Authors should access task instance information through the + Task Context via :func:`~airflow.sdk.get_current_context` instead. - _api/airflow/models/taskinstancekey/index .. _pythonapi:hooks: Hooks ------ +===== Hooks are interfaces to external platforms and databases, implementing a common interface when possible and acting as building blocks for operators. All hooks -are derived from :class:`~airflow.hooks.base.BaseHook`. +are derived from :class:`~airflow.sdk.bases.hook.BaseHook`. Airflow has a set of Hooks that are considered public. You are free to extend their functionality by extending them: @@ -152,14 +240,44 @@ by extending them: _api/airflow/hooks/index Public Airflow utilities ------------------------- +======================== -When writing or extending Hooks and Operators, DAG authors and developers can +When writing or extending Hooks and Operators, DAG Authors and developers can use the following classes: -* The :class:`~airflow.models.connection.Connection`, which provides access to external service credentials and configuration. -* The :class:`~airflow.models.variable.Variable`, which provides access to Airflow configuration variables. -* The :class:`~airflow.models.xcom.XCom` which are used to access to inter-task communication data. +* The :class:`~airflow.sdk.Connection`, which provides access to external service credentials and configuration. +* The :class:`~airflow.sdk.Variable`, which provides access to Airflow configuration variables. +* The :class:`~airflow.sdk.execution_time.xcom.XCom` which are used to access to inter-task communication data. + +Connection and Variable operations should be performed through the Task Context using +:func:`~airflow.sdk.get_current_context` and the task instance's methods, or through the airflow.sdk namespace. +Direct database access to :class:`~airflow.models.connection.Connection` and :class:`~airflow.models.variable.Variable` +models is no longer allowed from task code. + +Example of accessing Connections and Variables through Task Context: + +.. code-block:: python + + from airflow.sdk import get_current_context + + + def my_task(): + context = get_current_context() + + conn = context["conn"] + my_connection = conn.get("my_connection_id") + + var = context["var"] + my_variable = var.value.get("my_variable_name") + +Example of using airflow.sdk namespace directly: + +.. code-block:: python + + from airflow.sdk import Connection, Variable + + conn = Connection.get("my_connection_id") + var = Variable.get("my_variable_name") You can read more about the public Airflow utilities in :doc:`howto/connection`, :doc:`core-concepts/variables`, :doc:`core-concepts/xcoms` @@ -167,18 +285,13 @@ You can read more about the public Airflow utilities in :doc:`howto/connection`, Reference for classes used for the utilities are here: -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/connection/index - _api/airflow/models/variable/index - _api/airflow/models/xcom/index +.. note:: + Connection, Variable, and XCom classes are now part of the airflow.sdk namespace. + For detailed API documentation, see the `Task SDK Reference `_. Public Exceptions ------------------ +================= When writing the custom Operators and Hooks, you can handle and raise public Exceptions that Airflow exposes: @@ -191,7 +304,7 @@ exposes: _api/airflow/exceptions/index Public Utility classes ----------------------- +====================== .. toctree:: :includehidden: @@ -215,7 +328,7 @@ that do not require plugins - you can read more about them in :doc:`howto/custom Here are the ways how Plugins can be used to extend Airflow: Triggers --------- +======== Airflow uses Triggers to implement ``asyncio`` compatible Deferrable Operators. All Triggers derive from :class:`~airflow.triggers.base.BaseTrigger`. @@ -233,7 +346,7 @@ by extending them: You can read more about Triggers in :doc:`authoring-and-scheduling/deferring`. Timetables ----------- +========== Custom timetable implementations provide Airflow's scheduler additional logic to schedule DAG runs in ways not possible with built-in schedule expressions. @@ -251,7 +364,7 @@ by extending them: You can read more about Timetables in :doc:`howto/timetable`. Listeners ---------- +========= Listeners enable you to respond to DAG/Task lifecycle events. @@ -264,11 +377,8 @@ can be implemented to respond to DAG/Task lifecycle events. You can read more about Listeners in :doc:`administration-and-deployment/listeners`. -.. - TODO AIP-72: This class has been moved to task sdk but we cannot add a doc reference for it yet because task sdk doesn't have rendered docs yet. - Extra Links ------------ +=========== Extra links are dynamic links that could be added to Airflow independently from custom Operators. Normally they can be defined by the Operators, but plugins allow you to override the links on a global level. @@ -285,7 +395,7 @@ You can read more about providers :doc:`providers `. Executors ---------- +========= Executors are the mechanism by which task instances get run. All executors are derived from :class:`~airflow.executors.base_executor.BaseExecutor`. There are several @@ -305,10 +415,10 @@ You can read more about executors and how to write your own in :doc:`core-concep executors, and custom executors could not provide full functionality that built-in executors had. Secrets Backends ----------------- +================ Airflow can be configured to rely on secrets backends to retrieve -:class:`~airflow.models.connection.Connection` and :class:`~airflow.models.variable.Variable`. +:class:`~airflow.sdk.Connection` and :class:`~airflow.sdk.Variable`. All secrets backends derive from :class:`~airflow.secrets.base_secrets.BaseSecretsBackend`. All Secrets Backend implementations are public. You can extend their functionality: @@ -325,7 +435,7 @@ You can also find all the available Secrets Backends implemented in community pr in :doc:`apache-airflow-providers:core-extensions/secrets-backends`. Auth managers -------------- +============= Auth managers are responsible of user authentication and user authorization in Airflow. All auth managers are derived from :class:`~airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager`. @@ -336,21 +446,21 @@ public, but the different implementations of auth managers are not (i.e. FabAuth You can read more about auth managers and how to write your own in :doc:`core-concepts/auth-manager/index`. Connections ------------ +=========== When creating Hooks, you can add custom Connections. You can read more about connections in :doc:`apache-airflow-providers:core-extensions/connections` for available Connections implemented in the community providers. Extra Links ------------ +=========== When creating Hooks, you can add custom Extra Links that are displayed when the tasks are run. You can find out more about extra links in :doc:`apache-airflow-providers:core-extensions/extra-links` that also shows available extra links implemented in the community providers. Logging and Monitoring ----------------------- +====================== You can extend the way how logs are written by Airflow. You can find out more about log writing in :doc:`administration-and-deployment/logging-monitoring/index`. @@ -359,40 +469,44 @@ The :doc:`apache-airflow-providers:core-extensions/logging` that also shows avai implemented in the community providers. Decorators ----------- -DAG authors can use decorators to author dags using the :doc:`TaskFlow ` concept. -All Decorators derive from :class:`~airflow.decorators.base.TaskDecorator`. +========== +DAG Authors can use decorators to author dags using the :doc:`TaskFlow ` concept. +All Decorators derive from :class:`~airflow.sdk.bases.decorator.TaskDecorator`. + +The primary decorators for DAG Authors are now in the airflow.sdk namespace: +:func:`~airflow.sdk.dag`, :func:`~airflow.sdk.task`, :func:`~airflow.sdk.asset`, +:func:`~airflow.sdk.setup`, :func:`~airflow.sdk.task_group`, :func:`~airflow.sdk.teardown`, +:func:`~airflow.sdk.chain`, :func:`~airflow.sdk.chain_linear`, :func:`~airflow.sdk.cross_downstream`, +:func:`~airflow.sdk.get_current_context` and :func:`~airflow.sdk.get_parsing_context`. Airflow has a set of Decorators that are considered public. You are free to extend their functionality by extending them: -.. toctree:: - :includehidden: - :maxdepth: 1 - - _api/airflow/decorators/index +.. note:: + Decorators are now part of the airflow.sdk namespace. + For detailed API documentation, see the `Task SDK Reference `_. You can read more about creating custom Decorators in :doc:`howto/create-custom-decorator`. Email notifications -------------------- +=================== Airflow has a built-in way of sending email notifications and it allows to extend it by adding custom email notification classes. You can read more about email notifications in :doc:`howto/email-config`. Notifications -------------- +============= Airflow has a built-in extensible way of sending notifications using the various ``on_*_callback``. You can read more about notifications in :doc:`howto/notifications`. Cluster Policies ----------------- +================ Cluster Policies are the way to dynamically apply cluster-wide policies to the dags being parsed or tasks being executed. You can read more about Cluster Policies in :doc:`administration-and-deployment/cluster-policies`. Lineage -------- +======= Airflow can help track origins of data, what happens to it and where it moves over time. You can read more about lineage in :doc:`administration-and-deployment/lineage`. @@ -418,3 +532,49 @@ but in Airflow they are not parts of the Public Interface and might change any t * Python classes except those explicitly mentioned in this document, are considered an internal implementation detail and you should not assume they will be maintained in a backwards-compatible way. + +**Direct metadata database access from task code is no longer allowed**. +Task code cannot directly access the metadata database to query DAG state, task history, +or DAG runs. Instead, use one of the following alternatives: + +* **Task Context**: Use :func:`~airflow.sdk.get_current_context` to access task instance + information and methods like :meth:`~airflow.sdk.types.RuntimeTaskInstanceProtocol.get_dr_count`, + :meth:`~airflow.sdk.types.RuntimeTaskInstanceProtocol.get_dagrun_state`, and + :meth:`~airflow.sdk.types.RuntimeTaskInstanceProtocol.get_task_states`. + +* **REST API**: Use the :doc:`Stable REST API ` for programmatic + access to Airflow metadata. + +* **Python Client**: Use the `Python Client `_ for Python-based + interactions with Airflow. + +This change improves architectural separation and enables remote execution capabilities. + +Example of using Task Context instead of direct database access: + +.. code-block:: python + + from airflow.sdk import dag, get_current_context, task + from airflow.utils.state import DagRunState + from datetime import datetime + + + @dag(dag_id="example_dag", start_date=datetime(2025, 1, 1), schedule="@hourly", tags=["misc"], catchup=False) + def example_dag(): + + @task(task_id="check_dagrun_state") + def check_state(): + context = get_current_context() + ti = context["ti"] + dag_run = context["dag_run"] + + # Use Task Context methods instead of direct DB access + dr_count = ti.get_dr_count(dag_id="example_dag") + dagrun_state = ti.get_dagrun_state(dag_id="example_dag", run_id=dag_run.run_id) + + return f"DAG run count: {dr_count}, current state: {dagrun_state}" + + check_state() + + + example_dag()