Skip to content

Commit

Permalink
Docs: Add templating info to TaskFlow tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
infused-kim committed Oct 10, 2024
1 parent 54005f8 commit 437616d
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 0 deletions.
23 changes: 23 additions & 0 deletions airflow/example_dags/sql/tutorial_taskflow_template.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/

select * from test_data
where 1=1
and run_id = '{{ run_id }}'
and something_else = '{{ params.foobar }}'
107 changes: 107 additions & 0 deletions airflow/example_dags/tutorial_taskflow_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

# [START tutorial]
# [START import_module]
import pendulum

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

# [END import_module]


# [START instantiate_dag]
@dag(
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
params={"foobar": "param_from_dag", "other_param": "from_dag"},
)
def tutorial_taskflow_templates():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the templates in the TaskFlow API.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
# [END instantiate_dag]

# [START template_test]
@task(
# Causes variables that end with `.sql` to be read and templates
# within to be rendered.
templates_exts=[".sql"],
)
def template_test(sql, test_var, data_interval_end):
context = get_current_context()

# Will print...
# select * from test_data
# where 1=1
# and run_id = 'scheduled__2024-10-09T00:00:00+00:00'
# and something_else = 'param_from_task'
print(f"sql: {sql}")

# Will print `scheduled__2024-10-09T00:00:00+00:00`
print(f"test_var: {test_var}")

# Will print `2024-10-10 00:00:00+00:00`.
# Note how we didn't pass this value when calling the task. Instead
# it was passed by the decorator from the context
print(f"data_interval_end: {data_interval_end}")

# Will print...
# run_id: scheduled__2024-10-09T00:00:00+00:00; params.other_param: from_dag
template_str = "run_id: {{ run_id }}; params.other_param: {{ params.other_param }}"
rendered_template = context["task"].render_template(
template_str,
context,
)
print(f"rendered template: {rendered_template}")

# Will print the full context dict
print(f"context: {context}")

# [END template_test]

# [START main_flow]
template_test.override(
# Will be merged with the dict defined in the dag
# and override existing parameters.
#
# Must be passed into the decorator's parameters
# through `.override()` not into the actual task
# function
params={"foobar": "param_from_task"},
)(
sql="sql/test.sql",
test_var="{{ run_id }}",
)
# [END main_flow]


# [START dag_invocation]
tutorial_taskflow_templates()
# [END dag_invocation]

# [END tutorial]
56 changes: 56 additions & 0 deletions docs/apache-airflow/tutorial/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,62 @@ method.
Current context is accessible only during the task execution. The context is not accessible during
``pre_execute`` or ``post_execute``. Calling this method outside execution context will raise an error.

Using templates in decorated tasks
----------------------------------------------

Arguments passed to your decorated function are automatically templated.

You can also use the ``templates_exts`` parameter to template entire files.

.. code-block:: python
@task(templates_exts=[".sql"])
def template_test(sql):
print(f"sql: {sql}")
template_test(sql="sql/test.sql")
This will read the content of ``sql/test.sql`` and replace all template variables. You can also pass a list of files and all of them will be templated.

You can pass additional parameters to the template engine through `the params parameter </concepts/params.html>`_.

However, the ``params`` parameter must be passed to the decorator and not to your function directly, such as ``@task(templates_exts=['.sql'], params={'my_param'})`` and can then be used with ``{{ params.my_param }}`` in your templated files and function parameters.

Alternatively, you can also pass it using the ``.override()`` method:

.. code-block:: python
@task()
def template_test(input_var):
print(f"input_var: {input_var}")
template_test.override(params={"my_param": "wow"})(
input_var="my param is: {{ params.my_param }}",
)
Finally, you can also manually render templates:

.. code-block:: python
@task(params={"my_param": "wow"})
def template_test():
template_str = "run_id: {{ run_id }}; params.my_param: {{ params.my_param }}"
context = get_current_context()
rendered_template = context["task"].render_template(
template_str,
context,
)
Here is a full example that demonstrates everything above:

.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_templates.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]

Conditionally skipping tasks
----------------------------

Expand Down

0 comments on commit 437616d

Please sign in to comment.