Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support jinja2 native Python types #14603

Merged
merged 4 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import pendulum
from croniter import croniter
from dateutil.relativedelta import relativedelta
from jinja2.nativetypes import NativeEnvironment
from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_
from sqlalchemy.orm import backref, joinedload, relationship
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -257,6 +258,7 @@ def __init__(
access_control: Optional[Dict] = None,
is_paused_upon_creation: Optional[bool] = None,
jinja_environment_kwargs: Optional[Dict] = None,
render_template_as_native_obj: bool = False,
tags: Optional[List[str]] = None,
):
from airflow.utils.task_group import TaskGroup
Expand Down Expand Up @@ -359,6 +361,7 @@ def __init__(
self.is_paused_upon_creation = is_paused_upon_creation

self.jinja_environment_kwargs = jinja_environment_kwargs
self.render_template_as_native_obj = render_template_as_native_obj
self.tags = tags
self._task_group = TaskGroup.create_root(self)

Expand Down Expand Up @@ -991,8 +994,10 @@ def get_template_env(self) -> jinja2.Environment:
}
if self.jinja_environment_kwargs:
jinja_env_options.update(self.jinja_environment_kwargs)

env = jinja2.Environment(**jinja_env_options) # type: ignore
if self.render_template_as_native_obj:
env = NativeEnvironment(**jinja_env_options)
else:
env = jinja2.Environment(**jinja_env_options) # type: ignore

# Add any user defined items. Safe to edit globals as long as no templates are rendered yet.
# http://jinja.pocoo.org/docs/2.10/api/#jinja2.Environment.globals
Expand Down
12 changes: 8 additions & 4 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1836,10 +1836,14 @@ def get_email_subject_content(self, exception):
max_tries=self.max_tries,
)
)

jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True
)
if self.dag.render_template_as_native_obj:
jinja_env = jinja2.nativetypes.NativeEnvironment(
loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True
)
else:
jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True
)
subject = jinja_env.from_string(default_subject).render(**jinja_context)
html_content = jinja_env.from_string(default_html_content).render(**jinja_context)
html_content_err = jinja_env.from_string(default_html_content_err).render(**jinja_context)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def execute_callable(self):
python_callable_source=self.get_python_source(),
),
filename=script_filename,
render_template_as_native_obj=self.dag.render_template_as_native_obj,
)

execute_in_subprocess(
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"is_paused_upon_creation": { "type": "boolean" },
"has_on_success_callback": { "type": "boolean" },
"has_on_failure_callback": { "type": "boolean" },
"render_template_as_native_obj": { "type": "boolean" },
"tags": { "type": "array" },
"_task_group": {"anyOf": [
{ "type": "null" },
Expand Down
15 changes: 13 additions & 2 deletions airflow/utils/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ def prepare_virtualenv(
return f'{venv_directory}/bin/python'


def write_python_script(jinja_context: dict, filename: str):
def write_python_script(
jinja_context: dict,
filename: str,
render_template_as_native_obj: bool = False,
):
"""
Renders the python script to a file to execute in the virtual environment.

Expand All @@ -109,8 +113,15 @@ def write_python_script(jinja_context: dict, filename: str):
:type jinja_context: dict
:param filename: The name of the file to dump the rendered script to.
:type filename: str
:param render_template_as_native_obj: If ``True``, rendered Jinja template would be converted
to a native Python object
"""
template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(__file__))
template_env = jinja2.Environment(loader=template_loader, undefined=jinja2.StrictUndefined)
if render_template_as_native_obj:
template_env = jinja2.nativetypes.NativeEnvironment(
loader=template_loader, undefined=jinja2.StrictUndefined
)
else:
template_env = jinja2.Environment(loader=template_loader, undefined=jinja2.StrictUndefined)
template = template_env.get_template('python_virtualenv_script.jinja2')
template.stream(**jinja_context).dump(filename)
60 changes: 59 additions & 1 deletion docs/apache-airflow/concepts/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ For example, say you want to pass the execution date as an environment variable

Here, ``{{ ds }}`` is a macro, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the execution date will be available as an environment variable named ``EXECUTION_DATE`` in your Bash script.

You can use Jinja templating with every parameter that is marked as "templated" in the documentation. Template substitution occurs just before the pre_execute function of your operator is called.
You can use Jinja templating with every parameter that is marked as "templated" in the documentation. Template substitution occurs just before the ``pre_execute`` function of your operator is called.

You can also use Jinja templating with nested fields, as long as these nested fields are marked as templated in the structure they belong to: fields registered in ``template_fields`` property will be submitted to template substitution, like the ``path`` field in the example below:

Expand Down Expand Up @@ -141,3 +141,61 @@ You can pass custom options to the Jinja ``Environment`` when creating your DAG.
)

See the `Jinja documentation <https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_ to find all available options.

Rendering Fields as Native Python Objects
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

By default, all the ``template_fields`` are rendered as strings.

Example, let's say ``extract`` task pushes a dictionary
(Example: ``{"1001": 301.27, "1002": 433.21, "1003": 502.22}``) to :ref:`XCom <concepts:xcom>` table.
Now, when the following task is run, ``order_data`` argument is passed a string, example:
``'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'``.

.. code-block:: python

transform = PythonOperator(
task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform
)


If you instead want the rendered template field to return a Native Python object (``dict`` in our example),
you can pass ``render_template_as_native_obj=True`` to the DAG as follows:

.. code-block:: python

dag = DAG(
dag_id="example_template_as_python_object",
schedule_interval=None,
start_date=days_ago(2),
render_template_as_native_obj=True,
)

def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)

def transform(order_data):
print(type(order_data))
for value in order_data.values():
total_order_value += value
return {"total_order_value": total_order_value}

extract_task = PythonOperator(
task_id="extract",
python_callable=extract
)

transform_task = PythonOperator(
task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform
)

extract_task >> transform_task

In this case, ``order_data`` argument is passed: ``{"1001": 301.27, "1002": 433.21, "1003": 502.22}``.

Airflow uses Jinja's `NativeEnvironment <https://jinja.palletsprojects.com/en/2.11.x/nativetypes/>`_
when ``render_template_as_native_obj`` is set to ``True``.
With ``NativeEnvironment``, rendering a template produces a native Python type.
51 changes: 51 additions & 0 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,34 @@ def test_render_template(self, content, context, expected_output):
result = task.render_template(content, context)
assert result == expected_output

@parameterized.expand(
[
("{{ foo }}", {"foo": "bar"}, "bar"),
("{{ foo }}", {"foo": ["bar1", "bar2"]}, ["bar1", "bar2"]),
(["{{ foo }}", "{{ foo | length}}"], {"foo": ["bar1", "bar2"]}, [['bar1', 'bar2'], 2]),
(("{{ foo }}_1", "{{ foo }}_2"), {"foo": "bar"}, ("bar_1", "bar_2")),
("{{ ds }}", {"ds": date(2018, 12, 6)}, date(2018, 12, 6)),
(datetime(2018, 12, 6, 10, 55), {"foo": "bar"}, datetime(2018, 12, 6, 10, 55)),
("{{ ds }}", {"ds": datetime(2018, 12, 6, 10, 55)}, datetime(2018, 12, 6, 10, 55)),
(MockNamedTuple("{{ foo }}_1", "{{ foo }}_2"), {"foo": "bar"}, MockNamedTuple("bar_1", "bar_2")),
(
("{{ foo }}", "{{ foo.isoformat() }}"),
{"foo": datetime(2018, 12, 6, 10, 55)},
(datetime(2018, 12, 6, 10, 55), '2018-12-06T10:55:00'),
),
(None, {}, None),
([], {}, []),
({}, {}, {}),
]
)
def test_render_template_with_native_envs(self, content, context, expected_output):
"""Test render_template given various input types with Native Python types"""
with DAG("test-dag", start_date=DEFAULT_DATE, render_template_as_native_obj=True):
task = DummyOperator(task_id="op1")

result = task.render_template(content, context)
assert result == expected_output

def test_render_template_fields(self):
"""Verify if operator attributes are correctly templated."""
with DAG("test-dag", start_date=DEFAULT_DATE):
Expand All @@ -149,6 +177,20 @@ def test_render_template_fields(self):
assert task.arg1 == "footemplated"
assert task.arg2 == "bartemplated"

def test_render_template_fields_native_envs(self):
"""Verify if operator attributes are correctly templated to Native Python objects."""
with DAG("test-dag", start_date=DEFAULT_DATE, render_template_as_native_obj=True):
task = MockOperator(task_id="op1", arg1="{{ foo }}", arg2="{{ bar }}")

# Assert nothing is templated yet
assert task.arg1 == "{{ foo }}"
assert task.arg2 == "{{ bar }}"

# Trigger templating and verify if attributes are templated correctly
task.render_template_fields(context={"foo": ["item1", "item2"], "bar": 3})
assert task.arg1 == ["item1", "item2"]
assert task.arg2 == 3

@parameterized.expand(
[
({"user_defined_macros": {"foo": "bar"}}, "{{ foo }}", {}, "bar"),
Expand Down Expand Up @@ -228,6 +270,15 @@ def test_jinja_env_creation(self, mock_jinja_env):
task.render_template_fields(context={"foo": "whatever", "bar": "whatever"})
assert mock_jinja_env.call_count == 1

@mock.patch("airflow.models.dag.NativeEnvironment", autospec=True)
def test_jinja_env_creation_native_environment(self, mock_jinja_env):
"""Verify if a Jinja environment is created only once when templating."""
with DAG("test-dag", start_date=DEFAULT_DATE, render_template_as_native_obj=True):
task = MockOperator(task_id="op1", arg1="{{ foo }}", arg2="{{ bar }}")

task.render_template_fields(context={"foo": "whatever", "bar": "whatever"})
assert mock_jinja_env.call_count == 1

def test_set_jinja_env_additional_option(self):
"""Test render_template given various input types."""
with DAG(
Expand Down