diff --git a/airflow-core/docs/index.rst b/airflow-core/docs/index.rst index d6773d3055793..f55ee3a3029b7 100644 --- a/airflow-core/docs/index.rst +++ b/airflow-core/docs/index.rst @@ -32,6 +32,15 @@ Airflow workflows are defined entirely in Python. This "workflows as code" appro - **Extensible**: The Airflow framework includes a wide range of built-in operators and can be extended to fit your needs. - **Flexible**: Airflow leverages the `Jinja `_ templating engine, allowing rich customizations. +.. _task-sdk-docs: + +Task SDK +======== + +For Airflow Task SDK, see the standalone reference & tutorial site: + + https://airflow.apache.org/docs/task-sdk/stable/ + Dags ----------------------------------------- diff --git a/airflow-core/src/airflow/example_dags/example_asset_alias.py b/airflow-core/src/airflow/example_dags/example_asset_alias.py index d0a6a20188a92..5c4df1aa09c35 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_alias.py +++ b/airflow-core/src/airflow/example_dags/example_asset_alias.py @@ -32,6 +32,7 @@ from __future__ import annotations +# [START example_asset_alias] import pendulum from airflow.sdk import DAG, Asset, AssetAlias, task @@ -94,3 +95,4 @@ def consume_asset_event_from_asset_alias(*, inlet_events=None): print(event) consume_asset_event_from_asset_alias() +# [END example_asset_alias] diff --git a/airflow-core/src/airflow/example_dags/example_assets.py b/airflow-core/src/airflow/example_dags/example_assets.py index 2bb3cffc527f8..3ab372112585c 100644 --- a/airflow-core/src/airflow/example_dags/example_assets.py +++ b/airflow-core/src/airflow/example_dags/example_assets.py @@ -52,6 +52,7 @@ from __future__ import annotations +# [START asset_def] import pendulum from airflow.providers.standard.operators.bash import BashOperator @@ -59,9 +60,7 @@ from airflow.timetables.assets import AssetOrTimeSchedule from airflow.timetables.trigger import CronTriggerTimetable -# [START asset_def] dag1_asset = Asset("s3://dag1/output_1.txt", extra={"hi": "bye"}) -# [END asset_def] dag2_asset = Asset("s3://dag2/output_1.txt", extra={"hi": "bye"}) dag3_asset = Asset("s3://dag3/output_3.txt", extra={"hi": "bye"}) @@ -189,3 +188,4 @@ task_id="conditional_asset_and_time_based_timetable", bash_command="sleep 5", ) +# [END asset_def] diff --git a/airflow-core/src/airflow/example_dags/example_dag_decorator.py b/airflow-core/src/airflow/example_dags/example_dag_decorator.py index c1f5b39233231..5d1312a888e80 100644 --- a/airflow-core/src/airflow/example_dags/example_dag_decorator.py +++ b/airflow-core/src/airflow/example_dags/example_dag_decorator.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +# [START dag_decorator_usage] from typing import TYPE_CHECKING, Any import httpx @@ -43,7 +44,6 @@ def execute(self, context: Context): return httpx.get(self.url).json() -# [START dag_decorator_usage] @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), diff --git a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py index 9f4f45511cf04..750c3da1ec17b 100644 --- a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py +++ b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py @@ -19,6 +19,7 @@ from __future__ import annotations +# [START example_dynamic_task_mapping] from datetime import datetime from airflow.sdk import DAG, task @@ -56,3 +57,5 @@ def add_10(num): _get_nums = get_nums() _times_2 = times_2.expand(num=_get_nums) add_10.expand(num=_times_2) + +# [END example_dynamic_task_mapping] diff --git a/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py b/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py index e554b4f9cae89..8b68f85ef826d 100644 --- a/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py +++ b/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py @@ -19,6 +19,7 @@ from __future__ import annotations +# [START example_setup_teardown_taskflow] import pendulum from airflow.sdk import DAG, setup, task, task_group, teardown @@ -104,3 +105,4 @@ def inner_teardown(cluster_id): # and let's put section 1 inside the outer setup and teardown tasks section_1() +# [END example_setup_teardown_taskflow] diff --git a/airflow-core/src/airflow/example_dags/example_simplest_dag.py b/airflow-core/src/airflow/example_dags/example_simplest_dag.py index fad6f57950a9e..660f38c2e00e1 100644 --- a/airflow-core/src/airflow/example_dags/example_simplest_dag.py +++ b/airflow-core/src/airflow/example_dags/example_simplest_dag.py @@ -18,6 +18,7 @@ from __future__ import annotations +# [START simplest_dag] from airflow.sdk import dag, task @@ -30,4 +31,6 @@ def my_task(): my_task() +# [END simplest_dag] + example_simplest_dag() diff --git a/airflow-core/src/airflow/example_dags/example_task_group_decorator.py b/airflow-core/src/airflow/example_dags/example_task_group_decorator.py index 580b8bca5226a..5ed2a59ae3b64 100644 --- a/airflow-core/src/airflow/example_dags/example_task_group_decorator.py +++ b/airflow-core/src/airflow/example_dags/example_task_group_decorator.py @@ -19,12 +19,12 @@ from __future__ import annotations +# [START howto_task_group_decorator] import pendulum from airflow.sdk import DAG, task, task_group -# [START howto_task_group_decorator] # Creating Tasks @task def task_start(): diff --git a/airflow-core/src/airflow/example_dags/example_xcomargs.py b/airflow-core/src/airflow/example_dags/example_xcomargs.py index 6337cf482d98f..a64beb513baef 100644 --- a/airflow-core/src/airflow/example_dags/example_xcomargs.py +++ b/airflow-core/src/airflow/example_dags/example_xcomargs.py @@ -19,6 +19,7 @@ from __future__ import annotations +# [START example_xcomargs] import logging import pendulum @@ -63,3 +64,4 @@ def print_value(value, ts=None): xcom_args_b = print_value("second!") bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2 +# [END example_xcomargs] diff --git a/dev/README_RELEASE_AIRFLOW.md b/dev/README_RELEASE_AIRFLOW.md index 25a4f84653f28..a61ec4496a745 100644 --- a/dev/README_RELEASE_AIRFLOW.md +++ b/dev/README_RELEASE_AIRFLOW.md @@ -362,7 +362,7 @@ The command does the following: 3. Triggers S3 to GitHub Sync ```shell script - breeze workflow-run publish-docs --ref --site-env apache-airflow docker-stack + breeze workflow-run publish-docs --ref --site-env apache-airflow docker-stack task-sdk ``` The `--ref` parameter should be the tag of the release candidate you are publishing. @@ -387,7 +387,7 @@ The release manager publishes the documentation using GitHub Actions workflow the tag you use - pre-release tags go to staging. But you can also override it and specify the destination manually to be `live` or `staging`. -You should specify 'apache-airflow docker-stack' passed as packages to be +You should specify 'apache-airflow docker-stack task-sdk' passed as packages to be built. After that step, the provider documentation should be available under https://airflow.stage.apache.org// diff --git a/dev/breeze/doc/03_developer_tasks.rst b/dev/breeze/doc/03_developer_tasks.rst index 86097163f2715..9bc70819903c0 100644 --- a/dev/breeze/doc/03_developer_tasks.rst +++ b/dev/breeze/doc/03_developer_tasks.rst @@ -224,6 +224,10 @@ short ``provider id`` (might be multiple of them). breeze build-docs +To build documentation for Task SDK package, use the below command +.. code-block:: bash + breeze build-docs task-sdk + or you can use package filter. The filters are glob pattern matching full package names and can be used to select more than one package with single filter. diff --git a/dev/breeze/doc/images/output_build-docs.svg b/dev/breeze/doc/images/output_build-docs.svg index c5a579b4bb7a4..da2e06e5fb62b 100644 --- a/dev/breeze/doc/images/output_build-docs.svg +++ b/dev/breeze/doc/images/output_build-docs.svg @@ -232,7 +232,7 @@ microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | standard | tableau |       -telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                                          +task-sdk | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                               Build documents. diff --git a/dev/breeze/doc/images/output_build-docs.txt b/dev/breeze/doc/images/output_build-docs.txt index a4bcce3c3b163..6ec0cbe183a37 100644 --- a/dev/breeze/doc/images/output_build-docs.txt +++ b/dev/breeze/doc/images/output_build-docs.txt @@ -1 +1 @@ -44d93ca3ec21587ec90a45f0e4306997 +f64fad45c016c4dec0b180b3e7ff2826 diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.svg b/dev/breeze/doc/images/output_release-management_add-back-references.svg index 8f96cee667af6..1ac6d317ab368 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.svg +++ b/dev/breeze/doc/images/output_release-management_add-back-references.svg @@ -150,7 +150,7 @@ microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | standard | tableau |       -telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                                          +task-sdk | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                               Command to add back references for documentation to make it backward compatible. diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.txt b/dev/breeze/doc/images/output_release-management_add-back-references.txt index 681d95ca01f0d..5372e6d41c398 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.txt +++ b/dev/breeze/doc/images/output_release-management_add-back-references.txt @@ -1 +1 @@ -7390a9c0a3344fbfb55a930cd1b3b599 +5d1541667a68b9dd5b149bdc80dfe082 diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.svg b/dev/breeze/doc/images/output_release-management_publish-docs.svg index 8bb420dda2001..db5239192f4fa 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.svg +++ b/dev/breeze/doc/images/output_release-management_publish-docs.svg @@ -207,7 +207,7 @@ microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | standard | tableau |       -telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                                          +task-sdk | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                               Command to publish generated documentation to airflow-site diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.txt b/dev/breeze/doc/images/output_release-management_publish-docs.txt index ba4cd0b60a356..ecf2e8cee6c1e 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.txt +++ b/dev/breeze/doc/images/output_release-management_publish-docs.txt @@ -1 +1 @@ -addf5db2bbbc4a47d012fbb106364c0f +e529c5ebc1394484334dc188e7ef8f2c diff --git a/dev/breeze/doc/images/output_setup.svg b/dev/breeze/doc/images/output_setup.svg index 5dda408adefbc..c747a1eea7f38 100644 --- a/dev/breeze/doc/images/output_setup.svg +++ b/dev/breeze/doc/images/output_setup.svg @@ -110,7 +110,7 @@ Tools that developers can use to configure Breeze ╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮ ---help-hShow this message and exit. +--help-hShow this message and exit. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ╭─ Setup ──────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ autocomplete                   Enables autocompletion of breeze commands.                                          diff --git a/dev/breeze/doc/images/output_workflow-run_publish-docs.svg b/dev/breeze/doc/images/output_workflow-run_publish-docs.svg index 7a463d33c3d17..bd76a72646445 100644 --- a/dev/breeze/doc/images/output_workflow-run_publish-docs.svg +++ b/dev/breeze/doc/images/output_workflow-run_publish-docs.svg @@ -147,7 +147,7 @@ microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | standard | tableau |       -telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                                          +task-sdk | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                               Trigger publish docs to S3 workflow diff --git a/dev/breeze/doc/images/output_workflow-run_publish-docs.txt b/dev/breeze/doc/images/output_workflow-run_publish-docs.txt index b631dd614bffa..053b44077f68b 100644 --- a/dev/breeze/doc/images/output_workflow-run_publish-docs.txt +++ b/dev/breeze/doc/images/output_workflow-run_publish-docs.txt @@ -1 +1 @@ -8b3c4c52ca41d8988835da4088e35b78 +75a6fa574a8867b8a681f4792aa21a3c diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index b845777172383..650ee352ca24f 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -204,6 +204,7 @@ "docker-stack", "helm-chart", "apache-airflow-providers", + "task-sdk", ] diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py b/dev/breeze/src/airflow_breeze/utils/selective_checks.py index 11598daebd377..19b2082bd5af9 100644 --- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py +++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py @@ -201,6 +201,7 @@ def __hash__(self): r"^providers-summary-docs", r"^docker-stack-docs", r"^chart", + r"^task-sdk/docs/", r"^task-sdk/src/", r"^airflow-ctl/src/", r"^airflow-core/tests/system", @@ -1194,6 +1195,8 @@ def docs_list_as_string(self) -> str | None: packages.append("helm-chart") if any(file.startswith("docker-stack-docs") for file in self._files): packages.append("docker-stack") + if any(file.startswith("task-sdk/src/") for file in self._files): + packages.append("task-sdk") if providers_affected: for provider in providers_affected: packages.append(provider.replace("-", ".")) diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index b8b2ea2f7e3c5..efa84e6f44c62 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -2048,7 +2048,7 @@ def test_expected_output_push( "run-tests": "true", "skip-providers-tests": "false", "docs-build": "true", - "docs-list-as-string": "apache-airflow amazon common.compat common.io common.sql " + "docs-list-as-string": "apache-airflow task-sdk amazon common.compat common.io common.sql " "databricks dbt.cloud ftp google microsoft.mssql mysql " "openlineage postgres sftp snowflake trino", "skip-pre-commits": ALL_SKIPPED_COMMITS_ON_NO_CI_IMAGE, diff --git a/devel-common/src/sphinx_exts/docs_build/docs_builder.py b/devel-common/src/sphinx_exts/docs_build/docs_builder.py index 59b3692c69570..684ea033a8664 100644 --- a/devel-common/src/sphinx_exts/docs_build/docs_builder.py +++ b/devel-common/src/sphinx_exts/docs_build/docs_builder.py @@ -121,6 +121,8 @@ def _src_dir(self) -> Path: return (AIRFLOW_CONTENT_ROOT_PATH / "providers").joinpath(*package_paths) / "docs" if self.package_name == "apache-airflow-ctl": return AIRFLOW_CONTENT_ROOT_PATH / "airflow-ctl" / "docs" + if self.package_name == "task-sdk": + return AIRFLOW_CONTENT_ROOT_PATH / "task-sdk" / "docs" console.print(f"[red]Unknown package name: {self.package_name}") sys.exit(1) @@ -335,6 +337,7 @@ def get_available_packages(include_suspended: bool = False, short_form: bool = F *provider_names, "apache-airflow-providers", "apache-airflow-ctl", + "task-sdk", "helm-chart", "docker-stack", ] diff --git a/devel-common/src/sphinx_exts/docs_build/spelling_checks.py b/devel-common/src/sphinx_exts/docs_build/spelling_checks.py index 0b69b4fb17f35..58197f1340976 100644 --- a/devel-common/src/sphinx_exts/docs_build/spelling_checks.py +++ b/devel-common/src/sphinx_exts/docs_build/spelling_checks.py @@ -71,17 +71,17 @@ def __lt__(self, other): line_no_b: int = other.line_no or 0 context_line_a: str = self.context_line or "" context_line_b: str = other.context_line or "" - left: tuple[Path, int, int, str, str] = ( + left: tuple[Path, int, str, str, str] = ( file_path_a, line_no_a, context_line_a, self.spelling or "", self.message or "", ) - right: tuple[Path, int, int, str, str] = ( + right: tuple[Path, int, str, str, str] = ( file_path_b, - line_no_b or 0, - context_line_b or 0, + line_no_b, + context_line_b, other.spelling or "", other.message or "", ) diff --git a/devel-common/src/sphinx_exts/exampleinclude.py b/devel-common/src/sphinx_exts/exampleinclude.py index 2731f6522bf93..7bb4b0b132e68 100644 --- a/devel-common/src/sphinx_exts/exampleinclude.py +++ b/devel-common/src/sphinx_exts/exampleinclude.py @@ -78,6 +78,7 @@ class ExampleInclude(SphinxDirective): "emphasize-lines": directives.unchanged_required, "class": directives.class_option, "name": directives.unchanged, + "caption": directives.unchanged_required, "diff": directives.unchanged_required, } diff --git a/docs/README.md b/docs/README.md index 914672b6af99d..9012cb4e8ef71 100644 --- a/docs/README.md +++ b/docs/README.md @@ -130,7 +130,7 @@ the auto-detection. The person who triggers the build (release manager) should specify the tag name of the docs to be published and the list of documentation packages to be published. Usually it is: -* Airflow: `apache-airflow docker-stack` (later we will add `airflow-ctl` and `task-sdk`) +* Airflow: `apache-airflow docker-stack task-sdk` (later we will add `airflow-ctl`) * Helm chart: `helm-chart` * Providers: `provider_id1 provider_id2` or `all providers` if all providers should be published. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index c5db829550d16..f54012138bcd8 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -561,6 +561,7 @@ ds dsl Dsn dsn +dst dts dttm dtypes @@ -642,6 +643,7 @@ exc executables execvp exitcode +expanduser explicit exportingmultiple ext @@ -737,6 +739,7 @@ Gentner geq getattr getboolean +getcwd getfqdn getframe getint @@ -1047,6 +1050,7 @@ ListGenerator ListInfoTypesResponse ListModelsPager ListSecretsPager +LiteralValue Liveness liveness livy @@ -1792,6 +1796,7 @@ templatable templateable Templated templated +templater Templating templating templatize diff --git a/task-sdk/docs/.gitignore b/task-sdk/docs/.gitignore new file mode 100644 index 0000000000000..69fa449dd96e2 --- /dev/null +++ b/task-sdk/docs/.gitignore @@ -0,0 +1 @@ +_build/ diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst new file mode 100644 index 0000000000000..8605b234792a6 --- /dev/null +++ b/task-sdk/docs/api.rst @@ -0,0 +1,132 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +airflow.sdk API Reference +========================= + +This page documents the full public API exposed in Airflow 3.0+ via the Task SDK python module. + +If something is not on this page it is best to assume that it is not part of the public API and use of it is entirely at your own risk +-- we won't go out of our way break usage of them, but we make no promises either. + +Defining DAGs +------------- +.. autoapiclass:: airflow.sdk.DAG + + +Decorators +---------- +.. autoapifunction:: airflow.sdk.dag +.. autoapifunction:: airflow.sdk.task + +.. autoapifunction:: airflow.sdk.task_group + +.. autoapifunction:: airflow.sdk.setup + +.. autoapifunction:: airflow.sdk.teardown + +.. autofunction:: airflow.sdk.task +.. autofunction:: airflow.sdk.setup +.. autofunction:: airflow.sdk.teardown +.. autofunction:: airflow.sdk.asset + + +Bases +----- +.. autoapiclass:: airflow.sdk.BaseOperator + +.. autoapiclass:: airflow.sdk.BaseSensorOperator + +.. autoapiclass:: airflow.sdk.BaseNotifier + +.. autoapiclass:: airflow.sdk.BaseOperatorLink + +.. autoapiclass:: airflow.sdk.BaseXCom + +.. autoapiclass:: airflow.sdk.XComArg + +.. autoapiclass:: airflow.sdk.PokeReturnValue + +Connections & Variables +----------------------- +.. autoapiclass:: airflow.sdk.Connection + +.. autoapiclass:: airflow.sdk.Variable + +Tasks & Operators +----------------- +.. autoapiclass:: airflow.sdk.TaskGroup + +.. autoapifunction:: airflow.sdk.get_current_context + +.. autoapifunction:: airflow.sdk.get_parsing_context + +.. autoapiclass:: airflow.sdk.Param + +Setting Dependencies +~~~~~~~~~~~~~~~~~~~~ +.. autoapifunction:: airflow.sdk.chain + +.. autoapifunction:: airflow.sdk.chain_linear + +.. autoapifunction:: airflow.sdk.cross_downstream + +.. autoapifunction:: airflow.sdk.literal + +Edges & Labels +~~~~~~~~~~~~~~ +.. autoapiclass:: airflow.sdk.EdgeModifier + +.. autoapiclass:: airflow.sdk.Label + +Assets +------ +.. autoapiclass:: airflow.sdk.Asset + +.. autoapiclass:: airflow.sdk.AssetAlias + +.. autoapiclass:: airflow.sdk.AssetAll + +.. autoapiclass:: airflow.sdk.AssetAny + +.. autoapiclass:: airflow.sdk.AssetWatcher + +.. autoapiclass:: airflow.sdk.Metadata + +I/O Helpers +----------- +.. autoapiclass:: airflow.sdk.ObjectStoragePath + +Execution Time Components +------------------------- +.. rubric:: Context +.. autoapiclass:: airflow.sdk.Context +.. autoapimodule:: airflow.sdk.execution_time.context + :members: + :undoc-members: + + +Everything else +--------------- + +.. autoapimodule:: airflow.sdk + :members: + :special-members: __version__ + :exclude-members: BaseOperator, DAG, dag, asset, Asset, AssetAlias, AssetAll, AssetAny, AssetWatcher, TaskGroup, XComArg, get_current_context, get_parsing_context + :undoc-members: + :imported-members: + :no-index: diff --git a/task-sdk/docs/conf.py b/task-sdk/docs/conf.py new file mode 100644 index 0000000000000..e3fbacae2ce1a --- /dev/null +++ b/task-sdk/docs/conf.py @@ -0,0 +1,86 @@ +# Disable Flake8 because of all the sphinx imports +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import sys +from pathlib import Path + +CONF_DIR = Path(__file__).parent.absolute() +sys.path.insert(0, str(CONF_DIR.parent.parent.joinpath("devel-common", "src", "sphinx_exts").resolve())) + +project = "Apache Airflow Task SDK" + +language = "en" +locale_dirs: list[str] = [] + +extensions = [ + "sphinx.ext.autodoc", + "autoapi.extension", + "sphinx.ext.intersphinx", + "exampleinclude", + "sphinxcontrib.spelling", +] + +autoapi_dirs = [CONF_DIR.joinpath("..", "src").resolve()] +autoapi_root = "api" +autoapi_ignore = [ + "*/airflow/sdk/execution_time", + "*/airflow/sdk/api", + "*/_internal*", +] +autoapi_options = [ + "undoc-members", + "members", + "imported-members", +] +autoapi_add_toctree_entry = False +autoapi_generate_api_docs = False + +autodoc_typehints = "description" + +# Prefer pyi over py files if both are found +autoapi_file_patterns = ["*.pyi", "*.py"] + +html_theme = "sphinx_airflow_theme" + + +global_substitutions = { + "experimental": "This is an :ref:`experimental feature `.", +} + +rst_epilog = "\n".join(f".. |{key}| replace:: {replace}" for key, replace in global_substitutions.items()) + + +intersphinx_resolve_self = "airflow" +intersphinx_mapping = { + "airflow": ("https://airflow.apache.org/docs/apache-airflow/stable/", None), +} +# Suppress known warnings +suppress_warnings = [ + "autoapi.python_import_resolution", + "autodoc", +] + +exampleinclude_sourceroot = str(CONF_DIR.joinpath("..").resolve()) +spelling_show_suggestions = False +spelling_word_list_filename = [ + str(CONF_DIR.parent.parent.joinpath("docs", "spelling_wordlist.txt").resolve()) +] +spelling_ignore_importable_modules = True +spelling_ignore_contributor_names = True diff --git a/task-sdk/docs/examples.rst b/task-sdk/docs/examples.rst new file mode 100644 index 0000000000000..172ae4252602f --- /dev/null +++ b/task-sdk/docs/examples.rst @@ -0,0 +1,86 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Examples +======== + +Basic Examples +-------------- + +Define a basic DAG and task in just a few lines of Python: + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_simplest_dag.py + :language: python + :start-after: [START simplest_dag] + :end-before: [END simplest_dag] + :caption: Simplest DAG with :func:`@dag ` and :func:`@task ` + +Key Concepts +------------ +Defining DAGs +~~~~~~~~~~~~~ + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_dag_decorator.py + :language: python + :start-after: [START dag_decorator_usage] + :end-before: [END dag_decorator_usage] + :caption: Using the :func:`@dag ` decorator with custom tasks and operators. + +Decorators +~~~~~~~~~~ + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_task_group_decorator.py + :language: python + :start-after: [START howto_task_group_decorator] + :end-before: [END howto_task_group_decorator] + :caption: Group tasks using the :func:`@task_group ` decorator. + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py + :language: python + :start-after: [START example_setup_teardown_taskflow] + :end-before: [END example_setup_teardown_taskflow] + :caption: Define setup and teardown tasks with :func:`@setup ` and :func:`@teardown `. + +Tasks and Operators +~~~~~~~~~~~~~~~~~~~ + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py + :language: python + :start-after: [START example_dynamic_task_mapping] + :end-before: [END example_dynamic_task_mapping] + :caption: Dynamic task mapping with ``expand()`` + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_xcomargs.py + :language: python + :start-after: [START example_xcomargs] + :end-before: [END example_xcomargs] + :caption: Using ``XComArg`` to chain tasks based on return values. + +Assets +~~~~~~ + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_assets.py + :language: python + :start-after: [START asset_def] + :end-before: [END asset_def] + :caption: Defining an :func:`@asset ` + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_asset_alias.py + :language: python + :start-after: [START example_asset_alias] + :end-before: [END example_asset_alias] + :caption: Defining asset aliases with :class:`AssetAlias `. diff --git a/task-sdk/docs/index.rst b/task-sdk/docs/index.rst new file mode 100644 index 0000000000000..695f002d3559a --- /dev/null +++ b/task-sdk/docs/index.rst @@ -0,0 +1,92 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Apache Airflow Task SDK +================================= + +:any:`DAG` is where to start. :any:`dag` + +The Apache Airflow Task SDK provides python-native interfaces for defining DAGs, +executing tasks in isolated subprocesses and interacting with Airflow resources +(e.g., Connections, Variables, XComs, Metrics, Logs, and OpenLineage events) at runtime. +It also includes core execution-time components to manage communication between the worker +and the Airflow scheduler/backend. + +This approach reduces boilerplate and keeps your DAG definitions concise and readable. + + +Installation +------------ +To install the Task SDK, run: + +.. code-block:: bash + + pip install apache-airflow-task-sdk + +Getting Started +--------------- +Define a basic DAG and task in just a few lines of Python: + +.. exampleinclude:: ../../airflow-core/src/airflow/example_dags/example_simplest_dag.py + :language: python + :start-after: [START simplest_dag] + :end-before: [END simplest_dag] + :caption: Simplest DAG with :func:`@dag ` and :func:`@task ` + +Examples +-------- + +For more examples DAGs and patterns, see the :doc:`examples` page. + +Key Concepts +------------ +Defining DAGs +~~~~~~~~~~~~~ +Use ``@dag`` to convert a function into an Airflow DAG. All nested ``@task`` calls +become part of the workflow. + +Decorators +~~~~~~~~~~ +Simplify task definitions using decorators: + +- :func:`@task ` : define tasks. +- :func:`@task_group `: group related tasks into logical units. +- :func:`@setup ` and :func:`@teardown `: define setup and teardown tasks for DAGs and TaskGroups. + +Tasks and Operators +~~~~~~~~~~~~~~~~~~~ +Wrap Python callables with :func:`@task ` to create tasks, leverage dynamic task mapping with +``.expand()``, and pass data via ``XComArg``. You can also create traditional Operators +(e.g., sensors) via classes imported from the SDK: + + - **BaseOperator**, **Sensor**, **OperatorLink**, **Notifier**, **XComArg**, etc. + (see the **api reference** section for details) + +Assets +~~~~~~ +Model data as assets and emit them to downstream tasks with the SDK's asset library under +``airflow.sdk.definitions.asset``. You can use: + +- :func:`@asset `, :class:`~airflow.sdk.AssetAlias`, etc. (see the **api reference** section below) + +Refer to :doc:`api` for the complete reference of all decorators and classes. + +.. toctree:: + :hidden: + + examples + api diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml index d45cb26ed7e64..5119bd7e026c8 100644 --- a/task-sdk/pyproject.toml +++ b/task-sdk/pyproject.toml @@ -144,6 +144,9 @@ dev = [ "apache-airflow-providers-standard", "apache-airflow-devel-common", ] +docs = [ + "apache-airflow-devel-common[docs]", +] [tool.uv.sources] # These names must match the names as defined in the pyproject.toml of the workspace items, # *not* the workspace folder paths diff --git a/task-sdk/src/airflow/sdk/__init__.pyi b/task-sdk/src/airflow/sdk/__init__.pyi new file mode 100644 index 0000000000000..985e616af1c8b --- /dev/null +++ b/task-sdk/src/airflow/sdk/__init__.pyi @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.sdk.bases.notifier import BaseNotifier as BaseNotifier +from airflow.sdk.bases.operator import ( + BaseOperator as BaseOperator, + chain as chain, + chain_linear as chain_linear, + cross_downstream as cross_downstream, +) +from airflow.sdk.bases.operatorlink import BaseOperatorLink as BaseOperatorLink +from airflow.sdk.bases.sensor import ( + BaseSensorOperator as BaseSensorOperator, + PokeReturnValue as PokeReturnValue, +) +from airflow.sdk.definitions.asset import ( + Asset as Asset, + AssetAlias as AssetAlias, + AssetAll as AssetAll, + AssetAny as AssetAny, + AssetWatcher as AssetWatcher, +) +from airflow.sdk.definitions.asset.decorators import asset as asset +from airflow.sdk.definitions.asset.metadata import Metadata as Metadata +from airflow.sdk.definitions.connection import Connection as Connection +from airflow.sdk.definitions.context import ( + Context as Context, + get_current_context as get_current_context, + get_parsing_context as get_parsing_context, +) +from airflow.sdk.definitions.dag import DAG as DAG, dag as dag +from airflow.sdk.definitions.decorators import setup as setup, task as task, teardown as teardown +from airflow.sdk.definitions.decorators.task_group import task_group as task_group +from airflow.sdk.definitions.edges import EdgeModifier as EdgeModifier, Label as Label +from airflow.sdk.definitions.param import Param as Param +from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup +from airflow.sdk.definitions.template import literal as literal +from airflow.sdk.definitions.variable import Variable as Variable +from airflow.sdk.definitions.xcom_arg import XComArg as XComArg +from airflow.sdk.execution_time.cache import SecretCache as SecretCache +from airflow.sdk.io.path import ObjectStoragePath as ObjectStoragePath + +__all__ = [ + "__version__", + "Asset", + "AssetAlias", + "AssetAll", + "AssetAny", + "AssetWatcher", + "BaseNotifier", + "BaseOperator", + "BaseOperatorLink", + "BaseSensorOperator", + "Connection", + "Context", + "DAG", + "EdgeModifier", + "Label", + "Metadata", + "ObjectStoragePath", + "Param", + "PokeReturnValue", + "SecretCache", + "TaskGroup", + "Variable", + "XComArg", + "asset", + "chain", + "chain_linear", + "cross_downstream", + "dag", + "get_current_context", + "get_parsing_context", + "literal", + "setup", + "task", + "task_group", + "teardown", +] + +__version__: str diff --git a/task-sdk/src/airflow/sdk/definitions/context.py b/task-sdk/src/airflow/sdk/definitions/context.py index 6580b8bcf5e81..082ad36202ec2 100644 --- a/task-sdk/src/airflow/sdk/definitions/context.py +++ b/task-sdk/src/airflow/sdk/definitions/context.py @@ -118,6 +118,8 @@ class AirflowParsingContext(NamedTuple): If these values are not None, they will contain the specific DAG and Task ID that Airflow is requesting to execute. You can use these for optimizing dynamically generated DAG files. + + You can obtain the current values via :py:func:`.get_parsing_context`. """ dag_id: str | None diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 8b25e6a8b3919..11c419cef04d4 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -278,7 +278,7 @@ class DAG: :param schedule: If provided, this defines the rules according to which DAG runs are scheduled. Possible values include a cron expression string, timedelta object, Timetable, or list of Asset objects. - See also :doc:`/howto/timetable`. + See also :external:doc:`howto/timetable`. :param start_date: The timestamp from which the scheduler will attempt to backfill. If this is not provided, backfilling must be done manually with an explicit time range. @@ -352,7 +352,7 @@ class DAG: :param tags: List of tags to help filtering DAGs in the UI. :param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI. Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. - e.g: {"dag_owner": "https://airflow.apache.org/"} + e.g: ``{"dag_owner": "https://airflow.apache.org/"}`` :param auto_register: Automatically register this DAG when it is used in a ``with`` block :param fail_fast: Fails currently running tasks when task in DAG fails. **Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success"). diff --git a/task-sdk/src/airflow/sdk/io/path.py b/task-sdk/src/airflow/sdk/io/path.py index 5a3517f6527b8..5b66e09d110a0 100644 --- a/task-sdk/src/airflow/sdk/io/path.py +++ b/task-sdk/src/airflow/sdk/io/path.py @@ -237,14 +237,19 @@ def read_block(self, offset: int, length: int, delimiter=None): Examples -------- - >>> read_block(0, 13) - b'Alice, 100\\nBo' - >>> read_block(0, 13, delimiter=b"\\n") - b'Alice, 100\\nBob, 200\\n' - - Use ``length=None`` to read to the end of the file. - >>> read_block(0, None, delimiter=b"\\n") - b'Alice, 100\\nBob, 200\\nCharlie, 300' + .. code-block:: pycon + + # Read the first 13 bytes (no delimiter) + >>> read_block(0, 13) + b'Alice, 100\nBo' + + # Read first 13 bytes, but force newline boundaries + >>> read_block(0, 13, delimiter=b"\n") + b'Alice, 100\nBob, 200\n' + + # Read until EOF, but only stop at newline + >>> read_block(0, None, delimiter=b"\n") + b'Alice, 100\nBob, 200\nCharlie, 300' See Also -------- diff --git a/task-sdk/tests/test_docs_inventory.py b/task-sdk/tests/test_docs_inventory.py new file mode 100644 index 0000000000000..a1308031d524f --- /dev/null +++ b/task-sdk/tests/test_docs_inventory.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import sys +from pathlib import Path + +# Add the SDK src directory to sys.path so that importlib loads our airflow.sdk module +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) +import importlib +import shutil +import subprocess +import zlib +from pathlib import Path + +import pytest + + +def read_inventory(inv_path: Path): + """ + Read a Sphinx objects.inv inventory file and return a mapping of documented full names to their entries. + """ + inv: dict[str, tuple[str, str, str]] = {} + with inv_path.open("rb") as f: + f.readline() + f.readline() + f.readline() + f.readline() + data = zlib.decompress(f.read()).decode("utf-8").splitlines() + for line in data: + if not line.strip(): + continue + parts = line.split(None, 4) + if len(parts) != 5: + continue + name, domain_role, prio, location, dispname = parts + inv[name] = (domain_role, location, dispname) + return inv + + +@pytest.mark.skipif( + shutil.which("sphinx-build") is None, reason="sphinx-build not available, skipping docs inventory test" +) +def test_docs_inventory_matches_public_api(tmp_path): + """ + Build the HTML docs and compare the generated Sphinx inventory with the public API re-exports. + """ + docs_dir = Path(__file__).parent.parent / "docs" + build_dir = tmp_path / "build" + sphinx = shutil.which("sphinx-build") + subprocess.run([sphinx, "-b", "html", "-q", str(docs_dir), str(build_dir)], check=True) + inv_path = build_dir / "objects.inv" + assert inv_path.exists(), "objects.inv not found after docs build" + + inv = read_inventory(inv_path) + documented = { + name.rsplit(".", 1)[-1] + for name in inv.keys() + if name.startswith("airflow.sdk.") and name.count(".") == 2 + } + sdk = importlib.import_module("airflow.sdk") + public = set(getattr(sdk, "__all__", [])) - {"__version__"} + + extras = {"AirflowParsingContext"} + missing = public - documented + assert not missing, f"Public API items missing in docs: {missing}" + unexpected = (documented - public) - extras + assert not unexpected, f"Unexpected documented items: {unexpected}" diff --git a/task-sdk/tests/test_public_api.py b/task-sdk/tests/test_public_api.py new file mode 100644 index 0000000000000..dd2bf54980ce9 --- /dev/null +++ b/task-sdk/tests/test_public_api.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import importlib + + +def test_airflow_sdk_exports_exist(): + """ + Ensure that all names declared in airflow.sdk.__all__ are present as attributes on the module. + """ + sdk = importlib.import_module("airflow.sdk") + # Provide literal attribute for testing since it's declared in __all__ + template_mod = importlib.import_module("airflow.sdk.definitions.template") + setattr(sdk, "literal", getattr(template_mod, "literal")) + public_names = getattr(sdk, "__all__", []) + missing = [name for name in public_names if not hasattr(sdk, name)] + assert not missing, f"Missing exports in airflow.sdk: {missing}" + + +def test_airflow_sdk_no_unexpected_exports(): + """ + Ensure that no unexpected public attributes are present in airflow.sdk besides those in __all__. + """ + sdk = importlib.import_module("airflow.sdk") + public = set(getattr(sdk, "__all__", [])) + actual = {name for name in dir(sdk) if not name.startswith("_")} + ignore = { + "__getattr__", + "__lazy_imports", + "SecretCache", + "TYPE_CHECKING", + "annotations", + "api", + "bases", + "definitions", + "execution_time", + "io", + "log", + "exceptions", + } + unexpected = actual - public - ignore + assert not unexpected, f"Unexpected exports in airflow.sdk: {sorted(unexpected)}" + + +def test_lazy_imports_match_public_api(): + """ + Ensure that the dynamic lazy-imports mapping matches the public names in __all__, + except for the version string. + """ + import airflow.sdk as sdk + + lazy = getattr(sdk, "__lazy_imports", {}) + expected = set(getattr(sdk, "__all__", [])) - {"__version__", "literal"} + ignore = {"SecretCache"} + actual = set(lazy.keys()) + missing = expected - actual + extra = actual - expected - ignore + assert not missing, f"__lazy_imports missing entries for: {sorted(missing)}" + assert not extra, f"__lazy_imports has unexpected entries: {sorted(extra)}"