Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
BaseAssetUniqueKey,
)
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
from airflow.sdk.execution_time.secrets_masker import mask_secret

if TYPE_CHECKING:
from uuid import UUID
Expand Down Expand Up @@ -180,6 +181,8 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
import json

var_val = json.loads(var_val)
if isinstance(var_val, str):
mask_secret(var_val, key)
return var_val
except Exception:
log.exception(
Expand Down
22 changes: 22 additions & 0 deletions task-sdk/tests/task_sdk/definitions/test_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import json
from unittest import mock
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -132,6 +133,27 @@ def test_var_get_from_secrets_found_with_deserialize(self, mock_supervisor_comms
retrieved_var_deser = Variable.get(key="VAR_A", deserialize_json=True)
assert retrieved_var_deser == dict_data

@patch("airflow.sdk.execution_time.context.mask_secret")
def test_var_get_from_secrets_sensitive_key(self, mock_mask_secret, mock_supervisor_comms, tmp_path):
"""Tests getting a variable from secrets backend when deserialize_json is provided."""
path = tmp_path / "var.json"
data = {"secret": "super-secret"}
path.write_text(json.dumps(data, indent=4))

with conf_vars(
{
(
"workers",
"secrets_backend",
): "airflow.secrets.local_filesystem.LocalFilesystemBackend",
("workers", "secrets_backend_kwargs"): f'{{"variables_file_path": "{path}"}}',
}
):
retrieved_var = Variable.get(key="secret")
assert retrieved_var == "super-secret"

mock_mask_secret.assert_called_with("super-secret", "secret")

@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable")
def test_get_variable_env_var(self, mock_env_get, mock_supervisor_comms):
"""Tests getting a variable from environment variable."""
Expand Down