diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_branch_operator_decorator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_branch_operator_decorator.py index 3c8c2ba16057f..99f3981742e32 100644 --- a/providers/standard/src/airflow/providers/standard/example_dags/example_branch_operator_decorator.py +++ b/providers/standard/src/airflow/providers/standard/example_dags/example_branch_operator_decorator.py @@ -15,11 +15,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Example DAG demonstrating the usage of the branching TaskFlow API decorators. +""" +Example DAG demonstrating the usage of branching TaskFlow API decorators. -It shows how to use standard Python ``@task.branch`` as well as the external Python -version ``@task.branch_external_python`` which calls an external Python interpreter and -the ``@task.branch_virtualenv`` which builds a temporary Python virtual environment. +This example shows how to use standard Python `@task.branch`, as well as +`@task.branch_external_python` and `@task.branch_virtualenv` for branching +logic executed in external Python interpreters or isolated virtual +environments. """ from __future__ import annotations @@ -36,18 +38,59 @@ PATH_TO_PYTHON_BINARY = sys.executable + with DAG( dag_id="example_branch_python_operator_decorator", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule="@daily", tags=["example", "example2"], + doc_md=""" + ### Branch Decorator: Runtime Path Selection + + Branching enables conditional execution paths within a DAG by selecting + which downstream task(s) should run at runtime, while all other paths + are marked as skipped. This allows mutually exclusive workflows to be + expressed cleanly within a single DAG definition. + + **How branching selects execution paths:** + - A branch task returns the `task_id` (or list of `task_id`s) corresponding + to the next task(s) that should execute + - Only the returned downstream task(s) are executed; all other immediate + downstream tasks are marked as skipped + - Skipped branches do not fail the DAG run and are treated as a normal + execution outcome + + **Handling skipped branches downstream:** + - Tasks that follow a branching point must use trigger rules that account + for skipped upstream tasks (for example, `NONE_FAILED_MIN_ONE_SUCCESS`) + - Without appropriate trigger rules, downstream tasks may not execute + as expected due to skipped upstream states + - This behavior differs from short-circuiting, where all downstream + execution may be prevented entirely + + **Common use cases:** + - Conditional data processing based on runtime characteristics + (for example, small vs. large datasets) + - Environment-driven workflows where different paths are selected + dynamically + - Optional enrichment or validation steps that should only run when needed + - Mutually exclusive downstream actions within a single DAG + + **Branching vs. Python if/else:** + Branching is not equivalent to a Python `if/else` statement. All possible + branches exist in the DAG graph at parse time, and the branch task selects + which path is taken during execution. + + 📖 **Related documentation** + https://airflow.apache.org/docs/apache-airflow/stable/howto/operator.html#branching + """, ) as dag: run_this_first = EmptyOperator(task_id="run_this_first") options = ["a", "b", "c", "d"] - # Example branching on standard Python tasks + # Example branching with standard Python tasks # [START howto_operator_branch_python] @task.branch() @@ -74,7 +117,7 @@ def some_task(): # Label is optional here, but it can help identify more complex branches random_choice_instance >> Label(option) >> t >> empty >> join - # Example the same with external Python calls + # Example branching with external Python execution # [START howto_operator_branch_ext_py] @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) @@ -102,11 +145,11 @@ def some_ext_py_task(): # Label is optional here, but it can help identify more complex branches random_choice_ext_py >> Label(option) >> t >> join_ext_py - # Example the same with Python virtual environments + # Example branching with Python virtual environments # [START howto_operator_branch_virtualenv] - # Note: Passing a caching dir allows to keep the virtual environment over multiple runs - # Run the example a second time and see that it reuses it and is faster. + # Passing a cache directory allows the virtual environment to be reused + # across runs, reducing setup overhead on subsequent executions. VENV_CACHE_PATH = tempfile.gettempdir() @task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH) @@ -129,7 +172,9 @@ def branching_virtualenv(choices) -> str: for option in options: @task.virtualenv( - task_id=f"venv_{option}", requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH + task_id=f"venv_{option}", + requirements=["numpy~=1.26.0"], + venv_cache_path=VENV_CACHE_PATH, ) def some_venv_task(): import numpy as np diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_python_decorator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_python_decorator.py index ac9938d92eac2..e73cdc8639807 100644 --- a/providers/standard/src/airflow/providers/standard/example_dags/example_python_decorator.py +++ b/providers/standard/src/airflow/providers/standard/example_dags/example_python_decorator.py @@ -16,8 +16,8 @@ # specific language governing permissions and limitations # under the License. """ -Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a -virtual environment. +Example DAG demonstrating the usage of the TaskFlow API to execute Python functions +natively and within a virtual environment. """ from __future__ import annotations @@ -41,6 +41,28 @@ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], + doc_md=""" + ### TaskFlow API: Python Task Execution and Isolation + + The TaskFlow API (`@task` decorator) provides a Pythonic alternative to PythonOperator, with key advantages: automatic dependency inference from return values, simplified XCom handling, and improved code readability. Functions decorated with `@task` are tasks in your DAG graph, not Python functions called during parsing—this distinction enables proper task isolation, testing, and dynamic task generation. + + **When to use TaskFlow over PythonOperator:** + - Complex data pipelines where tasks pass structured data (dicts, objects); TaskFlow's implicit XCom handling reduces boilerplate + - Workflows requiring high code readability and maintainability; Python functions are easier to test and understand than operator instantiation + - Dynamic task generation and conditional task creation; decorator syntax integrates seamlessly with Python control flow + + **Dependency isolation with virtualenv and external Python:** + - `@task.virtualenv`: Creates isolated Python environments per task, essential when tasks require conflicting libraries (e.g., TensorFlow 1.x vs. 2.x in the same DAG) or strict reproducibility requirements + - `@task.external_python`: Delegates execution to a pre-installed Python interpreter, reducing overhead when isolation is needed but virtualenv creation is too expensive + - Both patterns prevent DAG-level package conflicts and enable using multiple Python versions in a single workflow + + **Important behavior:** + - TaskFlow functions execute at task runtime in the chosen environment, not during DAG parsing; heavy imports inside functions avoid DAG initialization overhead + - Return values are automatically serialized to XCom; XCom limits and serialization backends apply + - Virtualenv and external_python tasks consume additional executor resources and network bandwidth for environment setup + + 📖 [TaskFlow API Documentation](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html) + """, ) def example_python_decorator(): # [START howto_operator_python] @@ -63,28 +85,28 @@ def log_sql(**kwargs): # [END howto_operator_python_render_sql] # [START howto_operator_python_kwargs] - # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively @task def my_sleeping_function(random_base): - """This is a function that will run within the DAG execution""" + """This is a function that will run within the DAG execution.""" time.sleep(random_base) for i in range(5): sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10) - run_this >> log_the_sql >> sleeping_task # [END howto_operator_python_kwargs] # [START howto_operator_python_venv] @task.virtualenv( - task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False + task_id="virtualenv_python", + requirements=["colorama==0.4.0"], + system_site_packages=False, ) def callable_virtualenv(): """ Example function that will be performed in a virtual environment. - Importing at the module level ensures that it will not attempt to import the - library before it is installed. + Importing at the module level ensures that it will not attempt + to import the library before it is installed. """ from time import sleep @@ -108,10 +130,8 @@ def callable_virtualenv(): @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY) def callable_external_python(): """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. + Example function that will be performed using an external + Python interpreter. """ import sys from time import sleep diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_sensor_decorator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_sensor_decorator.py index 64ea80400ceb8..354ad790c4961 100644 --- a/providers/standard/src/airflow/providers/standard/example_dags/example_sensor_decorator.py +++ b/providers/standard/src/airflow/providers/standard/example_dags/example_sensor_decorator.py @@ -15,8 +15,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - -"""Example DAG demonstrating the usage of the sensor decorator.""" +""" +Example DAG demonstrating the usage of the sensor decorator. +""" from __future__ import annotations @@ -35,12 +36,46 @@ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], + doc_md=""" + ### Sensor Decorator: Data-Driven Task Dependencies + + Sensors are used to delay downstream execution until an external condition + or data dependency is satisfied. Unlike purely time-based scheduling, + sensors actively evaluate external state, making them a key building block + for data-driven workflows. + + **When to use sensors versus time-based scheduling:** + - Use sensors when downstream tasks depend on external data availability + or system state rather than elapsed time + - Time-based schedules are appropriate when execution timing is the primary + driver; sensors are appropriate when data readiness is the driver + - Combining both patterns enables data-aware scheduling + (for example, "run daily, but only after upstream data is ready") + + **Runtime behavior and resource considerations:** + - Sensors block downstream task execution but do not block DAG scheduling; + the DAG run remains active until the sensor succeeds or times out + - *Poke mode* (default): the task occupies an executor slot while polling, + which can increase resource usage for long waits + - *Reschedule mode*: the task is deferred between polls, reducing resource + consumption and improving scalability for long-running sensors + - On timeout, downstream task behavior depends on trigger rules and + failure-handling configuration + + **Data passing:** + - Sensors can return XCom values via `PokeReturnValue`, allowing downstream + tasks to consume metadata discovered during sensing without re-querying + the external system + + 📖 **Related documentation** + https://airflow.apache.org/docs/apache-airflow/stable/howto/operator.html#sensors + """, ) def example_sensor_decorator(): # [END instantiate_dag] # [START wait_function] - # Using a sensor operator to wait for the upstream data to be ready. + # Using a sensor decorator to wait for upstream data to be ready. @task.sensor(poke_interval=60, timeout=3600, mode="reschedule") def wait_for_upstream() -> PokeReturnValue: return PokeReturnValue(is_done=True, xcom_value="xcom_value") diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_short_circuit_decorator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_short_circuit_decorator.py index 598778edfd82e..bd624b7153923 100644 --- a/providers/standard/src/airflow/providers/standard/example_dags/example_short_circuit_decorator.py +++ b/providers/standard/src/airflow/providers/standard/example_dags/example_short_circuit_decorator.py @@ -25,7 +25,40 @@ from airflow.sdk import chain, dag, task -@dag(schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"]) +@dag( + schedule=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], + doc_md=""" + ### Short-Circuit Decorator: Conditional Task Skipping + + Short-circuiting allows a task to conditionally prevent downstream execution + by returning a falsy value, marking downstream tasks as skipped rather than failed. + This is fundamentally different from task failure—skipped tasks follow a separate + execution and alerting path. + + **When to use short-circuiting:** + - Guard conditions that prevent unnecessary downstream work (cost control, resource optimization) + - Data validation gates where non-execution is expected and not an error + - Conditional pipelines where skipping tasks is part of normal control flow + + **Runtime behavior:** + - When a short-circuit task returns a falsy value, all immediately downstream tasks + are marked as skipped + - Downstream trigger rules determine how skipped state propagates further + (for example, `ALL_DONE` vs `ALL_SUCCESS`) + - Skipped tasks are typically excluded from failure-based alerting and callbacks + + **Scheduling impact:** + - Short-circuiting affects only the current DAG run's execution path + - Future DAG runs are scheduled normally without modification to the DAG definition + - Useful for backfills and reprocessing scenarios without code changes + + 📖 **Related documentation** + https://airflow.apache.org/docs/apache-airflow/stable/howto/operator.html#short-circuiting + """, +) def example_short_circuit_decorator(): # [START howto_operator_short_circuit] @task.short_circuit()