Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ If needed, you can include an extra dictionary in an asset:

This can be used to supply custom description to the asset, such as who has ownership to the target file, or what the file is for. The extra information does not affect an asset's identity.

You can also use Jinja templating in the extra dictionary to enrich the asset with runtime information, such as the execution date of the task that emits events of the asset:

.. code-block::

BashOperator(
task_id="write_example_asset",
bash_command="echo 'writing...'",
outlets=Asset(
"asset_example",
extra={
"static_extra": "value",
"dag_id": "{{ dag.dag_id }}",
"nested_extra": {
"run_id": "{{ run_id }}",
"logical_date": "{{ ds }}",
}
}
),
)


.. note:: **Security Note:** Asset URI and extra fields are not encrypted, they are stored in cleartext in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in either asset URIs or extra key values!

Creating a task to emit asset events
Expand Down
94 changes: 93 additions & 1 deletion airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.sdk import BaseSensorOperator, task, task_group
from airflow.sdk import BaseSensorOperator, Metadata, task, task_group
from airflow.sdk.api.datamodels._generated import AssetEventResponse, AssetResponse
from airflow.sdk.definitions.asset import Asset, AssetAlias
from airflow.sdk.definitions.param import process_params
Expand Down Expand Up @@ -1653,6 +1653,98 @@ def _write2_post_execute(context, _):
assert events["write2"].asset.uri == "test_outlet_asset_extra_2"
assert events["write2"].extra == {"x": 1}

@pytest.mark.want_activate_assets(True)
def test_outlet_asset_template_extra(self, dag_maker, session):
from airflow.sdk.definitions.asset import Asset

with dag_maker(schedule=None, serialized=True, session=session):

@task(
outlets=Asset(
"test_outlet_asset_template_extra1",
extra={
"static_extra": "value",
"dag_id": "{{ dag.dag_id }}",
"nested_extra": {
"task_id": "{{ task.task_id }}",
"logical_date": "{{ ds }}",
},
},
)
)
def write_template1(*, outlet_events):
yield Metadata(
Asset("test_outlet_asset_template_extra1"),
{
"dag_id": "override_dag_id",
"some_other_key": "some_other_value",
},
)

write_template1()

BashOperator(
task_id="write_template2",
bash_command=":",
outlets=Asset(
"test_outlet_asset_template_extra2",
extra={
"static_extra": "value",
"dag_id": "{{ dag.dag_id }}",
"nested_extra": {
"task_id": "{{ task.task_id }}",
"logical_date": "{{ ds }}",
},
},
),
)

BashOperator(
task_id="write_asset_no_extra",
bash_command=":",
outlets=Asset("test_outlet_asset_no_extra"),
)

dr: DagRun = dag_maker.create_dagrun()
for ti in dr.get_task_instances(session=session):
ti.run(session=session)

events = dict(iter(session.execute(select(AssetEvent.source_task_id, AssetEvent))))
assert set(events) == {"write_template1", "write_template2", "write_asset_no_extra"}

assert events["write_template1"].source_dag_id == dr.dag_id
assert events["write_template1"].source_run_id == dr.run_id
assert events["write_template1"].source_task_id == "write_template1"
assert events["write_template1"].asset.uri == "test_outlet_asset_template_extra1"
assert events["write_template1"].extra == {
"static_extra": "value",
"dag_id": "override_dag_id", # Overridden by Metadata
"nested_extra": {
"task_id": "write_template1",
"logical_date": dr.logical_date.strftime("%Y-%m-%d"),
},
"some_other_key": "some_other_value", # Added by Metadata
}

assert events["write_template2"].source_dag_id == dr.dag_id
assert events["write_template2"].source_run_id == dr.run_id
assert events["write_template2"].source_task_id == "write_template2"
assert events["write_template2"].asset.uri == "test_outlet_asset_template_extra2"
assert events["write_template2"].extra == {
"static_extra": "value",
"dag_id": dr.dag_id,
"nested_extra": {
"task_id": "write_template2",
"logical_date": dr.logical_date.strftime("%Y-%m-%d"),
},
}

assert events["write_asset_no_extra"].source_dag_id == dr.dag_id
assert events["write_asset_no_extra"].source_run_id == dr.run_id
assert events["write_asset_no_extra"].source_task_id == "write_asset_no_extra"
assert events["write_asset_no_extra"].asset.uri == "test_outlet_asset_no_extra"
assert events["write_asset_no_extra"].extra == {}

@pytest.mark.want_activate_assets(True)
def test_outlet_asset_extra_ignore_different(self, dag_maker, session):
from airflow.sdk.definitions.asset import Asset
Expand Down
4 changes: 2 additions & 2 deletions task-sdk/src/airflow/sdk/definitions/_internal/templater.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def prepare_template(self) -> None:

def resolve_template_files(self) -> None:
"""Get the content of files for template_field / template_ext."""
if self.template_ext:
if getattr(self, "template_ext", None):
for field in self.template_fields:
content = getattr(self, field, None)
if isinstance(content, str) and content.endswith(tuple(self.template_ext)):
Expand Down Expand Up @@ -170,7 +170,7 @@ def render_template(
jinja_env = self.get_template_env()

if isinstance(value, str):
if value.endswith(tuple(self.template_ext)): # A filepath.
if hasattr(self, "template_ext") and value.endswith(tuple(self.template_ext)): # A filepath.
template = jinja_env.get_template(value)
else:
template = jinja_env.from_string(value)
Expand Down
22 changes: 21 additions & 1 deletion task-sdk/src/airflow/sdk/definitions/asset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
import attrs

from airflow.sdk.api.datamodels._generated import AssetProfile
from airflow.sdk.definitions._internal.templater import Templater
from airflow.serialization.dag_dependency import DagDependency

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
from urllib.parse import SplitResult

import jinja2

from airflow.models.asset import AssetModel
from airflow.sdk import Context
from airflow.sdk.io.path import ObjectStoragePath
from airflow.serialization.serialized_objects import SerializedAssetWatcher
from airflow.triggers.base import BaseEventTrigger
Expand Down Expand Up @@ -305,7 +309,7 @@ def __init__(


@attrs.define(init=False, unsafe_hash=False)
class Asset(os.PathLike, BaseAsset):
class Asset(os.PathLike, BaseAsset, Templater):
"""A representation of data asset dependencies between workflows."""

name: str = attrs.field(
Expand Down Expand Up @@ -489,6 +493,22 @@ def asprofile(self) -> AssetProfile:
"""
return AssetProfile(name=self.name or None, uri=self.uri or None, type=Asset.__name__)

def render_extra_field(
self,
context: Context,
jinja_env: jinja2.Environment | None = None,
) -> None:
"""
Template extra attribute.

:param context: Context dict with values to apply on content.
:param jinja_env: Jinja environment to use for rendering.
"""
dag = context["dag"]
if not jinja_env:
jinja_env = self.get_template_env(dag=dag)
self._do_render_template_fields(self, ("extra",), context, jinja_env, set())


class AssetRef(BaseAsset, AttrsInstance):
"""
Expand Down
5 changes: 5 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,11 @@ def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):

outlet_events = context_get_outlet_events(context)

for outlet in task.outlets or ():
if isinstance(outlet, Asset):
outlet.render_extra_field(context, jinja_env=task.dag.get_template_env())
outlet_events[outlet].extra.update(outlet.extra)

if (pre_execute_hook := task._pre_execute_hook) is not None:
create_executable_runner(pre_execute_hook, outlet_events, logger=log).run(context)
if getattr(pre_execute_hook := task.pre_execute, "__func__", None) is not BaseOperator.pre_execute:
Expand Down
90 changes: 87 additions & 3 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,10 +942,38 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
],
outlet_events=[],
outlet_events=[
{
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {},
}
],
),
id="asset",
),
pytest.param(
[
Asset(
name="s3://bucket/my-task",
uri="s3://bucket/my-task",
extra={"task_id": "{{ task.task_id }}"},
)
],
SucceedTask(
state="success",
end_date=timezone.datetime(2024, 12, 3, 10, 0),
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
],
outlet_events=[
{
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {"task_id": "asset-outlet-task"},
}
],
),
id="asset_with_template_extra",
),
pytest.param(
[Dataset(name="s3://bucket/my-task", uri="s3://bucket/my-task")],
SucceedTask(
Expand All @@ -954,10 +982,38 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
],
outlet_events=[],
outlet_events=[
{
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {},
}
],
),
id="dataset",
),
pytest.param(
[
Dataset(
name="s3://bucket/my-task",
uri="s3://bucket/my-task",
extra={"task_id": "{{ task.task_id }}"},
)
],
SucceedTask(
state="success",
end_date=timezone.datetime(2024, 12, 3, 10, 0),
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
],
outlet_events=[
{
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {"task_id": "asset-outlet-task"},
}
],
),
id="dataset_with_template_extra",
),
pytest.param(
[Model(name="s3://bucket/my-task", uri="s3://bucket/my-task")],
SucceedTask(
Expand All @@ -966,10 +1022,38 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
],
outlet_events=[],
outlet_events=[
{
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {},
}
],
),
id="model",
),
pytest.param(
[
Model(
name="s3://bucket/my-task",
uri="s3://bucket/my-task",
extra={"task_id": "{{ task.task_id }}"},
)
],
SucceedTask(
state="success",
end_date=timezone.datetime(2024, 12, 3, 10, 0),
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
],
outlet_events=[
{
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {"task_id": "asset-outlet-task"},
}
],
),
id="model_with_template_extra",
),
pytest.param(
[Asset.ref(name="s3://bucket/my-task")],
SucceedTask(
Expand Down