Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions task-sdk/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
autoapi_file_patterns = ["*.pyi", "*.py"]

html_theme = "sphinx_airflow_theme"
html_sidebars = {"**": ["localtoc.html", "globaltoc.html", "searchbox.html", "relations.html"]}


global_substitutions = {
Expand Down
99 changes: 99 additions & 0 deletions task-sdk/docs/dynamic-task-mapping.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _sdk-dynamic-task-mapping:

Dynamic Task Mapping with Task SDK
==================================

Dynamic Task Mapping allows tasks defined with the Task SDK to generate
a variable number of task instances at runtime based on upstream data.
This is enabled via the ``expand()`` method on tasks, providing a way
to parallelize execution without knowing the number of tasks ahead of time.

Simple Mapping
--------------

Map over a Python list directly in the DAG:

.. code-block:: python

from datetime import datetime

from airflow.sdk import DAG, task


@task
def add_one(x: int):
return x + 1


@task
def sum_it(values: list[int]):
print(f"Total was {sum(values)}")


with DAG(dag_id="dynamic-map-simple", start_date=datetime(2022, 1, 1)) as dag:
summed = sum_it(values=add_one.expand(x=[1, 2, 3, 4, 5]))

Task-Generated Mapping
----------------------

Generate the list at runtime from an upstream task:

.. code-block:: python

@task
def make_list():
# This could fetch data from an API, database, etc.
return ["a", "b", "c"]


@task
def consume(item: str):
print(item)


with DAG(dag_id="dynamic-map-generated", start_date=datetime(2022, 1, 1)) as dag:
consume.expand(item=make_list())

Details
-----------

- Only keyword arguments can be passed to ``expand()``.
- Mapped inputs are provided to tasks as lazy proxy objects. To force
evaluation into a concrete list, wrap the proxy in ``list()``.
- Combine static parameters with mapped ones using ``partial()``:

.. code-block:: python

@task
def add(x: int, y: int):
return x + y


with DAG(dag_id="map-with-partial", start_date=datetime(2022, 1, 1)) as dag:
add.partial(y=10).expand(x=[1, 2, 3])

Advanced Usage
--------------

For advanced patterns—such as repeated mapping, cross-product mapping,
named mappings (via ``map_index_template``), and handling large
datasets—see the Airflow Core documentation:

`Dynamic Task Mapping in the Airflow Core docs <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html>`_.
1 change: 1 addition & 0 deletions task-sdk/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,5 @@ Refer to :doc:`api` for the complete reference of all decorators and classes.
:hidden:

examples
dynamic-task-mapping
api