diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index 366a4069ca52b..394991e2029f7 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -104,6 +104,27 @@ If needed, you can include an extra dictionary in an asset: This can be used to supply custom description to the asset, such as who has ownership to the target file, or what the file is for. The extra information does not affect an asset's identity. +You can also use Jinja templating in the extra dictionary to enrich the asset with runtime information, such as the execution date of the task that emits events of the asset: + +.. code-block:: + + BashOperator( + task_id="write_example_asset", + bash_command="echo 'writing...'", + outlets=Asset( + "asset_example", + extra={ + "static_extra": "value", + "dag_id": "{{ dag.dag_id }}", + "nested_extra": { + "run_id": "{{ run_id }}", + "logical_date": "{{ ds }}", + } + } + ), + ) + + .. note:: **Security Note:** Asset URI and extra fields are not encrypted, they are stored in cleartext in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in either asset URIs or extra key values! Creating a task to emit asset events diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index f5bf36bcf4e0e..a730fa8d8d8df 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -64,7 +64,7 @@ from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.python import PythonSensor -from airflow.sdk import BaseSensorOperator, task, task_group +from airflow.sdk import BaseSensorOperator, Metadata, task, task_group from airflow.sdk.api.datamodels._generated import AssetEventResponse, AssetResponse from airflow.sdk.definitions.asset import Asset, AssetAlias from airflow.sdk.definitions.param import process_params @@ -1653,6 +1653,98 @@ def _write2_post_execute(context, _): assert events["write2"].asset.uri == "test_outlet_asset_extra_2" assert events["write2"].extra == {"x": 1} + @pytest.mark.want_activate_assets(True) + def test_outlet_asset_template_extra(self, dag_maker, session): + from airflow.sdk.definitions.asset import Asset + + with dag_maker(schedule=None, serialized=True, session=session): + + @task( + outlets=Asset( + "test_outlet_asset_template_extra1", + extra={ + "static_extra": "value", + "dag_id": "{{ dag.dag_id }}", + "nested_extra": { + "task_id": "{{ task.task_id }}", + "logical_date": "{{ ds }}", + }, + }, + ) + ) + def write_template1(*, outlet_events): + yield Metadata( + Asset("test_outlet_asset_template_extra1"), + { + "dag_id": "override_dag_id", + "some_other_key": "some_other_value", + }, + ) + + write_template1() + + BashOperator( + task_id="write_template2", + bash_command=":", + outlets=Asset( + "test_outlet_asset_template_extra2", + extra={ + "static_extra": "value", + "dag_id": "{{ dag.dag_id }}", + "nested_extra": { + "task_id": "{{ task.task_id }}", + "logical_date": "{{ ds }}", + }, + }, + ), + ) + + BashOperator( + task_id="write_asset_no_extra", + bash_command=":", + outlets=Asset("test_outlet_asset_no_extra"), + ) + + dr: DagRun = dag_maker.create_dagrun() + for ti in dr.get_task_instances(session=session): + ti.run(session=session) + + events = dict(iter(session.execute(select(AssetEvent.source_task_id, AssetEvent)))) + assert set(events) == {"write_template1", "write_template2", "write_asset_no_extra"} + + assert events["write_template1"].source_dag_id == dr.dag_id + assert events["write_template1"].source_run_id == dr.run_id + assert events["write_template1"].source_task_id == "write_template1" + assert events["write_template1"].asset.uri == "test_outlet_asset_template_extra1" + assert events["write_template1"].extra == { + "static_extra": "value", + "dag_id": "override_dag_id", # Overridden by Metadata + "nested_extra": { + "task_id": "write_template1", + "logical_date": dr.logical_date.strftime("%Y-%m-%d"), + }, + "some_other_key": "some_other_value", # Added by Metadata + } + + assert events["write_template2"].source_dag_id == dr.dag_id + assert events["write_template2"].source_run_id == dr.run_id + assert events["write_template2"].source_task_id == "write_template2" + assert events["write_template2"].asset.uri == "test_outlet_asset_template_extra2" + assert events["write_template2"].extra == { + "static_extra": "value", + "dag_id": dr.dag_id, + "nested_extra": { + "task_id": "write_template2", + "logical_date": dr.logical_date.strftime("%Y-%m-%d"), + }, + } + + assert events["write_asset_no_extra"].source_dag_id == dr.dag_id + assert events["write_asset_no_extra"].source_run_id == dr.run_id + assert events["write_asset_no_extra"].source_task_id == "write_asset_no_extra" + assert events["write_asset_no_extra"].asset.uri == "test_outlet_asset_no_extra" + assert events["write_asset_no_extra"].extra == {} + @pytest.mark.want_activate_assets(True) def test_outlet_asset_extra_ignore_different(self, dag_maker, session): from airflow.sdk.definitions.asset import Asset diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py index 29f5fce7d1bfc..9d24de475c536 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py @@ -89,7 +89,7 @@ def prepare_template(self) -> None: def resolve_template_files(self) -> None: """Get the content of files for template_field / template_ext.""" - if self.template_ext: + if getattr(self, "template_ext", None): for field in self.template_fields: content = getattr(self, field, None) if isinstance(content, str) and content.endswith(tuple(self.template_ext)): @@ -170,7 +170,7 @@ def render_template( jinja_env = self.get_template_env() if isinstance(value, str): - if value.endswith(tuple(self.template_ext)): # A filepath. + if hasattr(self, "template_ext") and value.endswith(tuple(self.template_ext)): # A filepath. template = jinja_env.get_template(value) else: template = jinja_env.from_string(value) diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index fae1ec79afec4..d136d4ad9db5e 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -29,13 +29,17 @@ import attrs from airflow.sdk.api.datamodels._generated import AssetProfile +from airflow.sdk.definitions._internal.templater import Templater from airflow.serialization.dag_dependency import DagDependency if TYPE_CHECKING: from collections.abc import Iterable, Iterator from urllib.parse import SplitResult + import jinja2 + from airflow.models.asset import AssetModel + from airflow.sdk import Context from airflow.sdk.io.path import ObjectStoragePath from airflow.serialization.serialized_objects import SerializedAssetWatcher from airflow.triggers.base import BaseEventTrigger @@ -305,7 +309,7 @@ def __init__( @attrs.define(init=False, unsafe_hash=False) -class Asset(os.PathLike, BaseAsset): +class Asset(os.PathLike, BaseAsset, Templater): """A representation of data asset dependencies between workflows.""" name: str = attrs.field( @@ -489,6 +493,22 @@ def asprofile(self) -> AssetProfile: """ return AssetProfile(name=self.name or None, uri=self.uri or None, type=Asset.__name__) + def render_extra_field( + self, + context: Context, + jinja_env: jinja2.Environment | None = None, + ) -> None: + """ + Template extra attribute. + + :param context: Context dict with values to apply on content. + :param jinja_env: Jinja environment to use for rendering. + """ + dag = context["dag"] + if not jinja_env: + jinja_env = self.get_template_env(dag=dag) + self._do_render_template_fields(self, ("extra",), context, jinja_env, set()) + class AssetRef(BaseAsset, AttrsInstance): """ diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 04baa2400f690..19c67e4c4ecd9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1270,6 +1270,11 @@ def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger): outlet_events = context_get_outlet_events(context) + for outlet in task.outlets or (): + if isinstance(outlet, Asset): + outlet.render_extra_field(context, jinja_env=task.dag.get_template_env()) + outlet_events[outlet].extra.update(outlet.extra) + if (pre_execute_hook := task._pre_execute_hook) is not None: create_executable_runner(pre_execute_hook, outlet_events, logger=log).run(context) if getattr(pre_execute_hook := task.pre_execute, "__func__", None) is not BaseOperator.pre_execute: diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index a34447f599dc4..5abb07c6bdcba 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -942,10 +942,38 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch task_outlets=[ AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset") ], - outlet_events=[], + outlet_events=[ + { + "dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {}, + } + ], ), id="asset", ), + pytest.param( + [ + Asset( + name="s3://bucket/my-task", + uri="s3://bucket/my-task", + extra={"task_id": "{{ task.task_id }}"}, + ) + ], + SucceedTask( + state="success", + end_date=timezone.datetime(2024, 12, 3, 10, 0), + task_outlets=[ + AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset") + ], + outlet_events=[ + { + "dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {"task_id": "asset-outlet-task"}, + } + ], + ), + id="asset_with_template_extra", + ), pytest.param( [Dataset(name="s3://bucket/my-task", uri="s3://bucket/my-task")], SucceedTask( @@ -954,10 +982,38 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch task_outlets=[ AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset") ], - outlet_events=[], + outlet_events=[ + { + "dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {}, + } + ], ), id="dataset", ), + pytest.param( + [ + Dataset( + name="s3://bucket/my-task", + uri="s3://bucket/my-task", + extra={"task_id": "{{ task.task_id }}"}, + ) + ], + SucceedTask( + state="success", + end_date=timezone.datetime(2024, 12, 3, 10, 0), + task_outlets=[ + AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset") + ], + outlet_events=[ + { + "dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {"task_id": "asset-outlet-task"}, + } + ], + ), + id="dataset_with_template_extra", + ), pytest.param( [Model(name="s3://bucket/my-task", uri="s3://bucket/my-task")], SucceedTask( @@ -966,10 +1022,38 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch task_outlets=[ AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset") ], - outlet_events=[], + outlet_events=[ + { + "dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {}, + } + ], ), id="model", ), + pytest.param( + [ + Model( + name="s3://bucket/my-task", + uri="s3://bucket/my-task", + extra={"task_id": "{{ task.task_id }}"}, + ) + ], + SucceedTask( + state="success", + end_date=timezone.datetime(2024, 12, 3, 10, 0), + task_outlets=[ + AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset") + ], + outlet_events=[ + { + "dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {"task_id": "asset-outlet-task"}, + } + ], + ), + id="model_with_template_extra", + ), pytest.param( [Asset.ref(name="s3://bucket/my-task")], SucceedTask(