diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 3748607b50e3c..43f18509c94c0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -788,9 +788,19 @@ def _serialize_rendered_fields(task: AbstractOperator) -> dict[str, JsonValue]: # TODO: Port one of the following to Task SDK # airflow.serialization.helpers.serialize_template_field or # airflow.models.renderedtifields.get_serialized_template_fields + from airflow.sdk._shared.secrets_masker import redact from airflow.serialization.helpers import serialize_template_field - return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} + rendered_fields = {} + for field in task.template_fields: + value = getattr(task, field) + serialized = serialize_template_field(value, field) + # Redact secrets in the task process itself before sending to API server + # This ensures that the secrets those are registered via mask_secret() on workers / dag processor are properly masked + # on the UI. + rendered_fields[field] = redact(serialized, field) + + return rendered_fields # type: ignore[return-value] # Convince mypy that this is OK since we pass JsonValue to redact, so it will return the same def _build_asset_profiles(lineage_objects: list) -> Iterator[AssetProfile]: diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index e094da8d79161..8c6862ddb4e08 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -89,6 +89,7 @@ GetVariable, GetXCom, GetXComSequenceSlice, + MaskSecret, OKResponse, PreviousDagRunResult, PrevSuccessfulDagRunResult, @@ -2662,6 +2663,57 @@ def execute(self, context): ) assert kwargs["from_email"] == self.FROM + @pytest.mark.enable_redact + def test_rendered_templates_mask_secrets(self, create_runtime_ti, mock_supervisor_comms): + """Test that secrets registered with mask_secret() are redacted in rendered template fields.""" + from unittest.mock import call + + from airflow.sdk._shared.secrets_masker import _secrets_masker + from airflow.sdk.log import mask_secret + + _secrets_masker().add_mask("admin_user_12345", None) + + class CustomOperator(BaseOperator): + template_fields = ("username", "region") + + def __init__(self, username, region, *args, **kwargs): + super().__init__(*args, **kwargs) + self.username = username + self.region = region + + def execute(self, context): + # Only mask username + mask_secret(self.username) + + task = CustomOperator( + task_id="test_masking", + username="admin_user_12345", + region="us-west-2", + ) + + runtime_ti = create_runtime_ti(task=task, dag_id="test_secrets_in_rtif") + run(runtime_ti, context=runtime_ti.get_template_context(), log=mock.MagicMock()) + + assert ( + call(MaskSecret(value="admin_user_12345", name=None, type="MaskSecret")) + in mock_supervisor_comms.send.mock_calls + ) + # Region should not be masked + assert ( + call(MaskSecret(value="us-west-2", name=None, type="MaskSecret")) + not in mock_supervisor_comms.send.mock_calls + ) + + assert ( + call( + msg=SetRenderedFields( + rendered_fields={"username": "***", "region": "us-west-2"}, + type="SetRenderedFields", + ) + ) + in mock_supervisor_comms.send.mock_calls + ) + class TestDagParamRuntime: DEFAULT_ARGS = {