From 238ab3adb5a5ff0265adf61635df4e68f66c3a30 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 27 Nov 2025 14:27:11 +0530 Subject: [PATCH 1/3] Redact secrets in rendered templates properly to not expose them on UI --- .../src/airflow/sdk/execution_time/task_runner.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 3748607b50e3c..43f18509c94c0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -788,9 +788,19 @@ def _serialize_rendered_fields(task: AbstractOperator) -> dict[str, JsonValue]: # TODO: Port one of the following to Task SDK # airflow.serialization.helpers.serialize_template_field or # airflow.models.renderedtifields.get_serialized_template_fields + from airflow.sdk._shared.secrets_masker import redact from airflow.serialization.helpers import serialize_template_field - return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} + rendered_fields = {} + for field in task.template_fields: + value = getattr(task, field) + serialized = serialize_template_field(value, field) + # Redact secrets in the task process itself before sending to API server + # This ensures that the secrets those are registered via mask_secret() on workers / dag processor are properly masked + # on the UI. + rendered_fields[field] = redact(serialized, field) + + return rendered_fields # type: ignore[return-value] # Convince mypy that this is OK since we pass JsonValue to redact, so it will return the same def _build_asset_profiles(lineage_objects: list) -> Iterator[AssetProfile]: From 4240f621cca2325283a3694f9a17e0b44b805114 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 27 Nov 2025 15:08:32 +0530 Subject: [PATCH 2/3] adding a unit test --- .../execution_time/test_task_runner.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index e094da8d79161..de5cd30fe4fc0 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -89,6 +89,7 @@ GetVariable, GetXCom, GetXComSequenceSlice, + MaskSecret, OKResponse, PreviousDagRunResult, PrevSuccessfulDagRunResult, @@ -2662,6 +2663,50 @@ def execute(self, context): ) assert kwargs["from_email"] == self.FROM + def test_rendered_templates_mask_secrets(self, create_runtime_ti, mock_supervisor_comms): + """Test that secrets registered with mask_secret() are redacted in rendered template fields.""" + from airflow.sdk.log import mask_secret + + class CustomOperator(BaseOperator): + template_fields = ("username", "region") + + def __init__(self, username, region, *args, **kwargs): + super().__init__(*args, **kwargs) + self.username = username + self.region = region + + def execute(self, context): + # Only mask username + mask_secret(self.username) + + task = CustomOperator( + task_id="test_masking", + username="admin_user_12345", + region="us-west-2", + ) + + runtime_ti = create_runtime_ti(task=task, dag_id="test_secrets_in_rtif") + run(runtime_ti, context=runtime_ti.get_template_context(), log=mock.MagicMock()) + + assert ( + call( + msg=SetRenderedFields( + rendered_fields={"username": "admin_user_12345", "region": "us-west-2"}, + type="SetRenderedFields", + ) + ) + in mock_supervisor_comms.send.mock_calls + ) + assert ( + call(MaskSecret(value="admin_user_12345", name=None, type="MaskSecret")) + in mock_supervisor_comms.send.mock_calls + ) + # Region should not be masked + assert ( + call(MaskSecret(value="us-west-2", name=None, type="MaskSecret")) + not in mock_supervisor_comms.send.mock_calls + ) + class TestDagParamRuntime: DEFAULT_ARGS = { From 9deda62e20f3e2ab23e6d81cca3e9adb40872424 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 27 Nov 2025 15:34:26 +0530 Subject: [PATCH 3/3] adding a unit test --- .../execution_time/test_task_runner.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index de5cd30fe4fc0..8c6862ddb4e08 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -2663,10 +2663,16 @@ def execute(self, context): ) assert kwargs["from_email"] == self.FROM + @pytest.mark.enable_redact def test_rendered_templates_mask_secrets(self, create_runtime_ti, mock_supervisor_comms): """Test that secrets registered with mask_secret() are redacted in rendered template fields.""" + from unittest.mock import call + + from airflow.sdk._shared.secrets_masker import _secrets_masker from airflow.sdk.log import mask_secret + _secrets_masker().add_mask("admin_user_12345", None) + class CustomOperator(BaseOperator): template_fields = ("username", "region") @@ -2688,15 +2694,6 @@ def execute(self, context): runtime_ti = create_runtime_ti(task=task, dag_id="test_secrets_in_rtif") run(runtime_ti, context=runtime_ti.get_template_context(), log=mock.MagicMock()) - assert ( - call( - msg=SetRenderedFields( - rendered_fields={"username": "admin_user_12345", "region": "us-west-2"}, - type="SetRenderedFields", - ) - ) - in mock_supervisor_comms.send.mock_calls - ) assert ( call(MaskSecret(value="admin_user_12345", name=None, type="MaskSecret")) in mock_supervisor_comms.send.mock_calls @@ -2707,6 +2704,16 @@ def execute(self, context): not in mock_supervisor_comms.send.mock_calls ) + assert ( + call( + msg=SetRenderedFields( + rendered_fields={"username": "***", "region": "us-west-2"}, + type="SetRenderedFields", + ) + ) + in mock_supervisor_comms.send.mock_calls + ) + class TestDagParamRuntime: DEFAULT_ARGS = {