diff --git a/task-sdk/docs/conf.py b/task-sdk/docs/conf.py index e3fbacae2ce1a..67cbda55f58f2 100644 --- a/task-sdk/docs/conf.py +++ b/task-sdk/docs/conf.py @@ -58,6 +58,7 @@ autoapi_file_patterns = ["*.pyi", "*.py"] html_theme = "sphinx_airflow_theme" +html_sidebars = {"**": ["localtoc.html", "globaltoc.html", "searchbox.html", "relations.html"]} global_substitutions = { diff --git a/task-sdk/docs/dynamic-task-mapping.rst b/task-sdk/docs/dynamic-task-mapping.rst new file mode 100644 index 0000000000000..84481bffcac81 --- /dev/null +++ b/task-sdk/docs/dynamic-task-mapping.rst @@ -0,0 +1,99 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-dynamic-task-mapping: + +Dynamic Task Mapping with Task SDK +================================== + +Dynamic Task Mapping allows tasks defined with the Task SDK to generate +a variable number of task instances at runtime based on upstream data. +This is enabled via the ``expand()`` method on tasks, providing a way +to parallelize execution without knowing the number of tasks ahead of time. + +Simple Mapping +-------------- + +Map over a Python list directly in the DAG: + +.. code-block:: python + + from datetime import datetime + + from airflow.sdk import DAG, task + + + @task + def add_one(x: int): + return x + 1 + + + @task + def sum_it(values: list[int]): + print(f"Total was {sum(values)}") + + + with DAG(dag_id="dynamic-map-simple", start_date=datetime(2022, 1, 1)) as dag: + summed = sum_it(values=add_one.expand(x=[1, 2, 3, 4, 5])) + +Task-Generated Mapping +---------------------- + +Generate the list at runtime from an upstream task: + +.. code-block:: python + + @task + def make_list(): + # This could fetch data from an API, database, etc. + return ["a", "b", "c"] + + + @task + def consume(item: str): + print(item) + + + with DAG(dag_id="dynamic-map-generated", start_date=datetime(2022, 1, 1)) as dag: + consume.expand(item=make_list()) + +Details +----------- + +- Only keyword arguments can be passed to ``expand()``. +- Mapped inputs are provided to tasks as lazy proxy objects. To force + evaluation into a concrete list, wrap the proxy in ``list()``. +- Combine static parameters with mapped ones using ``partial()``: + + .. code-block:: python + + @task + def add(x: int, y: int): + return x + y + + + with DAG(dag_id="map-with-partial", start_date=datetime(2022, 1, 1)) as dag: + add.partial(y=10).expand(x=[1, 2, 3]) + +Advanced Usage +-------------- + +For advanced patterns—such as repeated mapping, cross-product mapping, +named mappings (via ``map_index_template``), and handling large +datasets—see the Airflow Core documentation: + +`Dynamic Task Mapping in the Airflow Core docs `_. diff --git a/task-sdk/docs/index.rst b/task-sdk/docs/index.rst index 6ff3650b58711..24410cd4ece45 100644 --- a/task-sdk/docs/index.rst +++ b/task-sdk/docs/index.rst @@ -88,4 +88,5 @@ Refer to :doc:`api` for the complete reference of all decorators and classes. :hidden: examples + dynamic-task-mapping api