Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions airflow-core/src/airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import sqlalchemy_jsonfield
from sqlalchemy import (
Expand Down Expand Up @@ -51,6 +51,28 @@
from airflow.serialization.serialized_objects import SerializedBaseOperator


def _get_nested_value(obj: Any, path: str) -> Any:
"""
Get a nested value from an object using a dot-separated path.

:param obj: The object to extract the value from
:param path: A dot-separated path (e.g., "configuration.query.sql")
:return: The value at the nested path, or None if the path doesn't exist
"""
keys = path.split(".")
current = obj
for key in keys:
if isinstance(current, dict):
current = current.get(key)
elif hasattr(current, key):
current = getattr(current, key)
else:
return None
if current is None:
return None
return current


def get_serialized_template_fields(task: SerializedBaseOperator):
"""
Get and serialize the template fields for a task.
Expand All @@ -61,7 +83,24 @@ def get_serialized_template_fields(task: SerializedBaseOperator):

:meta private:
"""
return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields}
rendered_fields = {}

for field in task.template_fields:
rendered_fields[field] = serialize_template_field(getattr(task, field), field)

renderers = getattr(task, "template_fields_renderers", {})
for renderer_path in renderers:
if "." in renderer_path:
base_field = renderer_path.split(".", 1)[0]

if base_field in task.template_fields:
base_value = getattr(task, base_field)
nested_value = _get_nested_value(base_value, renderer_path[len(base_field) + 1 :])

if nested_value is not None:
rendered_fields[renderer_path] = serialize_template_field(nested_value, renderer_path)

return rendered_fields


class RenderedTaskInstanceFields(TaskInstanceDependencies):
Expand Down
70 changes: 70 additions & 0 deletions airflow-core/tests/unit/models/test_renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import os
from collections import Counter
from collections.abc import Sequence
from datetime import date, timedelta
from typing import TYPE_CHECKING
from unittest import mock
Expand All @@ -33,6 +34,7 @@
from airflow._shared.timezones.timezone import datetime
from airflow.configuration import conf
from airflow.models import DagRun, Variable
from airflow.models.baseoperator import BaseOperator
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.taskmap import TaskMap
from airflow.providers.standard.operators.bash import BashOperator
Expand Down Expand Up @@ -448,3 +450,71 @@ def popuate_rtif(date):
# rerun the old run. this will shouldn't fail
ti.task = task
ti.run()

def test_nested_dictionary_template_field_rendering(self, dag_maker):
"""
Test that nested dictionary items in template fields are properly rendered
when using template_fields_renderers with dot-separated paths.

This test verifies the fix for rendering dictionary items in templates.
Before the fix, nested dictionary items specified in template_fields_renderers
(e.g., "configuration.query.sql") would not be rendered. After the fix,
these nested items are properly extracted and rendered.
"""

# Create a custom operator with a dictionary template field
class MyConfigOperator(BaseOperator):
template_fields: Sequence[str] = ("configuration",)
template_fields_renderers = {
"configuration": "json",
"configuration.query.sql": "sql",
}

def __init__(self, configuration: dict, **kwargs):
super().__init__(**kwargs)
self.configuration = configuration

# Create a configuration dictionary with nested structure
configuration = {
"query": {
"job_id": "123",
"sql": "select * from my_table where date = '{{ ds }}'",
}
}

with dag_maker("test_nested_dict_rendering"):
task = MyConfigOperator(task_id="test_config", configuration=configuration)
dr = dag_maker.create_dagrun()

session = dag_maker.session
ti = dr.task_instances[0]
ti.task = task
rtif = RTIF(ti=ti)

# Verify that the base configuration field is rendered
assert "configuration" in rtif.rendered_fields
rendered_config = rtif.rendered_fields["configuration"]
assert isinstance(rendered_config, dict)
assert rendered_config["query"]["job_id"] == "123"
# The SQL should be templated (ds should be replaced with actual date)
assert "select * from my_table where date = '" in rendered_config["query"]["sql"]
assert rendered_config["query"]["sql"] != configuration["query"]["sql"]

# Verify that the nested dictionary item is also rendered
# This is the key test - before the fix, this would not exist
assert "configuration.query.sql" in rtif.rendered_fields
rendered_sql = rtif.rendered_fields["configuration.query.sql"]
assert isinstance(rendered_sql, str)
assert "select * from my_table where date = '" in rendered_sql
# The template should be rendered (ds should be replaced)
assert "{{ ds }}" not in rendered_sql

# Store in database and verify retrieval
session.add(rtif)
session.flush()

retrieved_fields = RTIF.get_templated_fields(ti=ti, session=session)
assert retrieved_fields is not None
assert "configuration" in retrieved_fields
assert "configuration.query.sql" in retrieved_fields
assert retrieved_fields["configuration.query.sql"] == rendered_sql