Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the branching TaskFlow API decorators.
"""
Example DAG demonstrating the usage of branching TaskFlow API decorators.

It shows how to use standard Python ``@task.branch`` as well as the external Python
version ``@task.branch_external_python`` which calls an external Python interpreter and
the ``@task.branch_virtualenv`` which builds a temporary Python virtual environment.
This example shows how to use standard Python `@task.branch`, as well as
`@task.branch_external_python` and `@task.branch_virtualenv` for branching
logic executed in external Python interpreters or isolated virtual
environments.
"""

from __future__ import annotations
Expand All @@ -36,18 +38,59 @@

PATH_TO_PYTHON_BINARY = sys.executable


with DAG(
dag_id="example_branch_python_operator_decorator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
tags=["example", "example2"],
doc_md="""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea how we make docs in Dags consistent at work: How about adding all the documentation here as pydoc in the header of the file and must define it here as doc_md=__doc__?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, that’s a nice idea and I agree it would help keep DAG documentation consistent and co-located with the code.

For this PR, I’ve intentionally kept the documentation inline via doc_md to align with the existing pattern used in current example and tutorial DAGs. I wanted to keep the scope limited to addressing the missing Docs tab content without introducing a broader structural change.

That said, I like the doc_md = doc approach and would be happy to explore this as a follow-up or incremental PR, especially if there’s interest in standardizing this pattern across example DAGs. Happy to take guidance on whether that would be preferred going forward.

### Branch Decorator: Runtime Path Selection

Branching enables conditional execution paths within a DAG by selecting
which downstream task(s) should run at runtime, while all other paths
are marked as skipped. This allows mutually exclusive workflows to be
expressed cleanly within a single DAG definition.

**How branching selects execution paths:**
- A branch task returns the `task_id` (or list of `task_id`s) corresponding
to the next task(s) that should execute
- Only the returned downstream task(s) are executed; all other immediate
downstream tasks are marked as skipped
- Skipped branches do not fail the DAG run and are treated as a normal
execution outcome

**Handling skipped branches downstream:**
- Tasks that follow a branching point must use trigger rules that account
for skipped upstream tasks (for example, `NONE_FAILED_MIN_ONE_SUCCESS`)
- Without appropriate trigger rules, downstream tasks may not execute
as expected due to skipped upstream states
- This behavior differs from short-circuiting, where all downstream
execution may be prevented entirely

**Common use cases:**
- Conditional data processing based on runtime characteristics
(for example, small vs. large datasets)
- Environment-driven workflows where different paths are selected
dynamically
- Optional enrichment or validation steps that should only run when needed
- Mutually exclusive downstream actions within a single DAG

**Branching vs. Python if/else:**
Branching is not equivalent to a Python `if/else` statement. All possible
branches exist in the DAG graph at parse time, and the branch task selects
which path is taken during execution.

📖 **Related documentation**
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator.html#branching
""",
) as dag:
run_this_first = EmptyOperator(task_id="run_this_first")

options = ["a", "b", "c", "d"]

# Example branching on standard Python tasks
# Example branching with standard Python tasks

# [START howto_operator_branch_python]
@task.branch()
Expand All @@ -74,7 +117,7 @@ def some_task():
# Label is optional here, but it can help identify more complex branches
random_choice_instance >> Label(option) >> t >> empty >> join

# Example the same with external Python calls
# Example branching with external Python execution

# [START howto_operator_branch_ext_py]
@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
Expand Down Expand Up @@ -102,11 +145,11 @@ def some_ext_py_task():
# Label is optional here, but it can help identify more complex branches
random_choice_ext_py >> Label(option) >> t >> join_ext_py

# Example the same with Python virtual environments
# Example branching with Python virtual environments

# [START howto_operator_branch_virtualenv]
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it reuses it and is faster.
# Passing a cache directory allows the virtual environment to be reused
# across runs, reducing setup overhead on subsequent executions.
VENV_CACHE_PATH = tempfile.gettempdir()

@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
Expand All @@ -129,7 +172,9 @@ def branching_virtualenv(choices) -> str:
for option in options:

@task.virtualenv(
task_id=f"venv_{option}", requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH
task_id=f"venv_{option}",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
)
def some_venv_task():
import numpy as np
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# specific language governing permissions and limitations
# under the License.
"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions
natively and within a virtual environment.
"""

from __future__ import annotations
Expand All @@ -41,6 +41,28 @@
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
doc_md="""
### TaskFlow API: Python Task Execution and Isolation

The TaskFlow API (`@task` decorator) provides a Pythonic alternative to PythonOperator, with key advantages: automatic dependency inference from return values, simplified XCom handling, and improved code readability. Functions decorated with `@task` are tasks in your DAG graph, not Python functions called during parsing—this distinction enables proper task isolation, testing, and dynamic task generation.

**When to use TaskFlow over PythonOperator:**
- Complex data pipelines where tasks pass structured data (dicts, objects); TaskFlow's implicit XCom handling reduces boilerplate
- Workflows requiring high code readability and maintainability; Python functions are easier to test and understand than operator instantiation
- Dynamic task generation and conditional task creation; decorator syntax integrates seamlessly with Python control flow

**Dependency isolation with virtualenv and external Python:**
- `@task.virtualenv`: Creates isolated Python environments per task, essential when tasks require conflicting libraries (e.g., TensorFlow 1.x vs. 2.x in the same DAG) or strict reproducibility requirements
- `@task.external_python`: Delegates execution to a pre-installed Python interpreter, reducing overhead when isolation is needed but virtualenv creation is too expensive
- Both patterns prevent DAG-level package conflicts and enable using multiple Python versions in a single workflow

**Important behavior:**
- TaskFlow functions execute at task runtime in the chosen environment, not during DAG parsing; heavy imports inside functions avoid DAG initialization overhead
- Return values are automatically serialized to XCom; XCom limits and serialization backends apply
- Virtualenv and external_python tasks consume additional executor resources and network bandwidth for environment setup

📖 [TaskFlow API Documentation](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html)
""",
)
def example_python_decorator():
# [START howto_operator_python]
Expand All @@ -63,28 +85,28 @@ def log_sql(**kwargs):
# [END howto_operator_python_render_sql]

# [START howto_operator_python_kwargs]
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
"""This is a function that will run within the DAG execution."""
time.sleep(random_base)

for i in range(5):
sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

run_this >> log_the_sql >> sleeping_task
# [END howto_operator_python_kwargs]

# [START howto_operator_python_venv]
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
task_id="virtualenv_python",
requirements=["colorama==0.4.0"],
system_site_packages=False,
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.

Importing at the module level ensures that it will not attempt to import the
library before it is installed.
Importing at the module level ensures that it will not attempt
to import the library before it is installed.
"""
from time import sleep

Expand All @@ -108,10 +130,8 @@ def callable_virtualenv():
@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.

Importing at the module level ensures that it will not attempt to import the
library before it is installed.
Example function that will be performed using an external
Python interpreter.
"""
import sys
from time import sleep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Example DAG demonstrating the usage of the sensor decorator."""
"""
Example DAG demonstrating the usage of the sensor decorator.
"""

from __future__ import annotations

Expand All @@ -35,12 +36,46 @@
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
doc_md="""
### Sensor Decorator: Data-Driven Task Dependencies

Sensors are used to delay downstream execution until an external condition
or data dependency is satisfied. Unlike purely time-based scheduling,
sensors actively evaluate external state, making them a key building block
for data-driven workflows.

**When to use sensors versus time-based scheduling:**
- Use sensors when downstream tasks depend on external data availability
or system state rather than elapsed time
- Time-based schedules are appropriate when execution timing is the primary
driver; sensors are appropriate when data readiness is the driver
- Combining both patterns enables data-aware scheduling
(for example, "run daily, but only after upstream data is ready")

**Runtime behavior and resource considerations:**
- Sensors block downstream task execution but do not block DAG scheduling;
the DAG run remains active until the sensor succeeds or times out
- *Poke mode* (default): the task occupies an executor slot while polling,
which can increase resource usage for long waits
- *Reschedule mode*: the task is deferred between polls, reducing resource
consumption and improving scalability for long-running sensors
- On timeout, downstream task behavior depends on trigger rules and
failure-handling configuration

**Data passing:**
- Sensors can return XCom values via `PokeReturnValue`, allowing downstream
tasks to consume metadata discovered during sensing without re-querying
the external system

📖 **Related documentation**
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator.html#sensors
""",
)
def example_sensor_decorator():
# [END instantiate_dag]

# [START wait_function]
# Using a sensor operator to wait for the upstream data to be ready.
# Using a sensor decorator to wait for upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,40 @@
from airflow.sdk import chain, dag, task


@dag(schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"])
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
doc_md="""
### Short-Circuit Decorator: Conditional Task Skipping

Short-circuiting allows a task to conditionally prevent downstream execution
by returning a falsy value, marking downstream tasks as skipped rather than failed.
This is fundamentally different from task failure—skipped tasks follow a separate
execution and alerting path.

**When to use short-circuiting:**
- Guard conditions that prevent unnecessary downstream work (cost control, resource optimization)
- Data validation gates where non-execution is expected and not an error
- Conditional pipelines where skipping tasks is part of normal control flow

**Runtime behavior:**
- When a short-circuit task returns a falsy value, all immediately downstream tasks
are marked as skipped
- Downstream trigger rules determine how skipped state propagates further
(for example, `ALL_DONE` vs `ALL_SUCCESS`)
- Skipped tasks are typically excluded from failure-based alerting and callbacks

**Scheduling impact:**
- Short-circuiting affects only the current DAG run's execution path
- Future DAG runs are scheduled normally without modification to the DAG definition
- Useful for backfills and reprocessing scenarios without code changes

📖 **Related documentation**
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator.html#short-circuiting
""",
)
def example_short_circuit_decorator():
# [START howto_operator_short_circuit]
@task.short_circuit()
Expand Down
Loading