diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index e8b4cb9b6a0ac..de5c77cb54635 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -39,6 +39,7 @@ BaseAssetUniqueKey, ) from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType +from airflow.sdk.execution_time.secrets_masker import mask_secret if TYPE_CHECKING: from uuid import UUID @@ -180,6 +181,8 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: import json var_val = json.loads(var_val) + if isinstance(var_val, str): + mask_secret(var_val, key) return var_val except Exception: log.exception( diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index c9e4698de800e..8bcf9ef28199a 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -19,6 +19,7 @@ import json from unittest import mock +from unittest.mock import patch import pytest @@ -132,6 +133,27 @@ def test_var_get_from_secrets_found_with_deserialize(self, mock_supervisor_comms retrieved_var_deser = Variable.get(key="VAR_A", deserialize_json=True) assert retrieved_var_deser == dict_data + @patch("airflow.sdk.execution_time.context.mask_secret") + def test_var_get_from_secrets_sensitive_key(self, mock_mask_secret, mock_supervisor_comms, tmp_path): + """Tests getting a variable from secrets backend when deserialize_json is provided.""" + path = tmp_path / "var.json" + data = {"secret": "super-secret"} + path.write_text(json.dumps(data, indent=4)) + + with conf_vars( + { + ( + "workers", + "secrets_backend", + ): "airflow.secrets.local_filesystem.LocalFilesystemBackend", + ("workers", "secrets_backend_kwargs"): f'{{"variables_file_path": "{path}"}}', + } + ): + retrieved_var = Variable.get(key="secret") + assert retrieved_var == "super-secret" + + mock_mask_secret.assert_called_with("super-secret", "secret") + @mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable") def test_get_variable_env_var(self, mock_env_get, mock_supervisor_comms): """Tests getting a variable from environment variable."""