diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2f144e403de36..e6b19bf586e07 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1636,6 +1636,7 @@ repos: ^airflow-core/src/airflow/api/common/mark_tasks\.py$| ^airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets\.py$| ^airflow-core/src/airflow/api_fastapi/core_api/datamodels/connections\.py$| + ^airflow-core/src/airflow/api_fastapi/core_api/services/public/connections\.py$| ^airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl\.py$| ^airflow-core/src/airflow/api_fastapi/core_api/datamodels/variables\.py$| ^airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid\.py$| diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py index 4c236cf9756be..05ab88ab90c1a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py @@ -17,6 +17,8 @@ from __future__ import annotations +import json + from fastapi import HTTPException, status from pydantic import ValidationError from sqlalchemy import select @@ -32,6 +34,7 @@ from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody from airflow.api_fastapi.core_api.services.public.common import BulkService from airflow.models.connection import Connection +from airflow.sdk.execution_time.secrets_masker import merge def update_orm_from_pydantic( @@ -56,11 +59,23 @@ def update_orm_from_pydantic( if (not update_mask and "password" in pydantic_conn.model_fields_set) or ( update_mask and "password" in update_mask ): - orm_conn.set_password(pydantic_conn.password) + if pydantic_conn.password is None: + orm_conn.set_password(pydantic_conn.password) + else: + merged_password = merge(pydantic_conn.password, orm_conn.password, "password") + orm_conn.set_password(merged_password) if (not update_mask and "extra" in pydantic_conn.model_fields_set) or ( update_mask and "extra" in update_mask ): - orm_conn.set_extra(pydantic_conn.extra) + if pydantic_conn.extra is None or orm_conn.extra is None: + orm_conn.set_extra(pydantic_conn.extra) + return + try: + merged_extra = merge(json.loads(pydantic_conn.extra), json.loads(orm_conn.extra)) + orm_conn.set_extra(json.dumps(merged_extra)) + except json.JSONDecodeError: + # We can't merge fields in an unstructured `extra` + orm_conn.set_extra(pydantic_conn.extra) class BulkConnectionService(BulkService[ConnectionBody]): diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json index 1e90ba8266df3..0eb5df5b8a22d 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json @@ -36,6 +36,7 @@ "extraFields": "Extra Fields", "extraFieldsJson": "Extra Fields JSON", "helperText": "Connection type missing? Make sure you have installed the corresponding Airflow Providers Package.", + "helperTextForRedactedFields": "Redacted fields ('***') will remain unchanged if not modified.", "selectConnectionType": "Select Connection Type", "standardFields": "Standard Fields" }, diff --git a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FlexibleForm.tsx b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FlexibleForm.tsx index 4fea751e59960..cf9f2bf4b8518 100644 --- a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FlexibleForm.tsx +++ b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FlexibleForm.tsx @@ -32,12 +32,14 @@ export type FlexibleFormProps = { initialParamsDict: { paramsDict: ParamsSpec }; key?: string; setError: (error: boolean) => void; + subHeader?: string; }; export const FlexibleForm = ({ flexibleFormDefaultSection, initialParamsDict, setError, + subHeader, }: FlexibleFormProps) => { const { paramsDict: params, setInitialParamDict, setParamsDict } = useParamStore(); const processedSections = new Map(); @@ -126,6 +128,11 @@ export const FlexibleForm = ({ + {Boolean(subHeader) ? ( + + {subHeader} + + ) : undefined} }> {Object.entries(params) .filter( diff --git a/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx b/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx index bf921a1de3c99..65a2d9ed6dae0 100644 --- a/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx @@ -37,6 +37,7 @@ import type { ConnectionBody } from "./Connections"; type AddConnectionFormProps = { readonly error: unknown; readonly initialConnection: ConnectionBody; + readonly isEditMode?: boolean; readonly isPending: boolean; readonly mutateConnection: (requestBody: ConnectionBody) => void; }; @@ -44,6 +45,7 @@ type AddConnectionFormProps = { const ConnectionForm = ({ error, initialConnection, + isEditMode = false, isPending, mutateConnection, }: AddConnectionFormProps) => { @@ -202,6 +204,7 @@ const ConnectionForm = ({ initialParamsDict={paramsDic} key={selectedConnType} setError={setFormErrors} + subHeader={isEditMode ? translate("connections.form.helperTextForRedactedFields") : undefined} /> @@ -220,6 +223,11 @@ const ConnectionForm = ({ }} /> {Boolean(errors.conf) ? {errors.conf} : undefined} + {isEditMode ? ( + + {translate("connections.form.helperTextForRedactedFields")} + + ) : undefined} )} /> diff --git a/airflow-core/src/airflow/ui/src/pages/Connections/EditConnectionButton.tsx b/airflow-core/src/airflow/ui/src/pages/Connections/EditConnectionButton.tsx index 1e3b7eccfabeb..f7c8c204ed0e7 100644 --- a/airflow-core/src/airflow/ui/src/pages/Connections/EditConnectionButton.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Connections/EditConnectionButton.tsx @@ -81,6 +81,7 @@ const EditConnectionButton = ({ connection, disabled }: Props) => { diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py index 6c56232a68a92..33acdef6f413e 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py @@ -468,6 +468,27 @@ class TestPatchConnection(TestConnectionEndpoint): "schema": None, }, ), + ( + # Sensitive "***" should be ignored. + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "port": 80, + "login": "test_login_patch", + "password": "***", + }, + { + "conn_type": TEST_CONN_TYPE, + "connection_id": TEST_CONN_ID, + "description": TEST_CONN_DESCRIPTION, + "extra": None, + "host": TEST_CONN_HOST, + "login": "test_login_patch", + "password": None, + "port": 80, + "schema": None, + }, + ), ( { "connection_id": TEST_CONN_ID, diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py index b6a7b359b3af4..1080d2f2d5d97 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py @@ -27,7 +27,7 @@ from enum import Enum from functools import cache, cached_property from re import Pattern -from typing import Any, TextIO, TypeAlias, TypeVar +from typing import Any, TextIO, TypeAlias, TypeVar, overload from airflow import settings @@ -116,6 +116,27 @@ def redact(value: Redactable, name: str | None = None, max_depth: int | None = N return _secrets_masker().redact(value, name, max_depth) +@overload +def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ... + + +@overload +def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ... + + +def merge( + new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None +) -> Redacted: + """ + Merge a redacted value with its original unredacted counterpart. + + Takes a user-modified redacted value and merges it with the original unredacted value. + For sensitive fields that still contain "***" (unchanged), the original value is restored. + For fields that have been updated by the user, the new value is preserved. + """ + return _secrets_masker().merge(new_value, old_value, name, max_depth) + + @cache def _secrets_masker() -> SecretsMasker: for flt in logging.getLogger("airflow.task").filters: @@ -292,6 +313,83 @@ def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int ) return item + def _merge( + self, + new_item: Redacted, + old_item: Redactable, + name: str | None, + depth: int, + max_depth: int, + force_sensitive: bool = False, + ) -> Redacted: + """Merge a redacted item with its original unredacted counterpart.""" + if depth > max_depth: + if isinstance(new_item, str) and new_item == "***": + return old_item + return new_item + + try: + # Determine if we should treat this as sensitive + is_sensitive = force_sensitive or (name is not None and should_hide_value_for_key(name)) + + if isinstance(new_item, dict) and isinstance(old_item, dict): + merged = {} + for key in new_item.keys(): + if key in old_item: + # For dicts, pass the key as name unless we're in sensitive mode + child_name = None if is_sensitive else key + merged[key] = self._merge( + new_item[key], + old_item[key], + name=child_name, + depth=depth + 1, + max_depth=max_depth, + force_sensitive=is_sensitive, + ) + else: + merged[key] = new_item[key] + return merged + + if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item): + merged_list = [] + for i in range(len(new_item)): + if i < len(old_item): + # In sensitive mode, check if individual item is redacted + if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***": + merged_list.append(old_item[i]) + else: + merged_list.append( + self._merge( + new_item[i], + old_item[i], + name=None, + depth=depth + 1, + max_depth=max_depth, + force_sensitive=is_sensitive, + ) + ) + else: + merged_list.append(new_item[i]) + + if isinstance(new_item, list): + return list(merged_list) + return tuple(merged_list) + + if isinstance(new_item, set) and isinstance(old_item, set): + # Sets are unordered, we cannot restore original items. + return new_item + + if _is_v1_env_var(new_item) and _is_v1_env_var(old_item): + # TODO: Handle Kubernetes V1EnvVar objects if needed + return new_item + + if is_sensitive and isinstance(new_item, str) and new_item == "***": + return old_item + return new_item + + except (TypeError, AttributeError, ValueError): + return new_item + def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted: """ Redact an any secrets found in ``item``, if it is a string. @@ -302,6 +400,25 @@ def redact(self, item: Redactable, name: str | None = None, max_depth: int | Non """ return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH) + def merge( + self, new_item: Redacted, old_item: Redactable, name: str | None = None, max_depth: int | None = None + ) -> Redacted: + """ + Merge a redacted item with its original unredacted counterpart. + + Takes a user-modified redacted item and merges it with the original unredacted item. + For sensitive fields that still contain "***" (unchanged), the original value is restored. + For fields that have been updated, the new value is preserved. + """ + return self._merge( + new_item, + old_item, + name=name, + depth=0, + max_depth=max_depth or self.MAX_RECURSION_DEPTH, + force_sensitive=False, + ) + @cached_property def _mask_adapter(self) -> None | Callable: """ diff --git a/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py b/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py index ca20cf2c9f4a6..4d3c3e7d5769a 100644 --- a/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py +++ b/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py @@ -34,6 +34,7 @@ RedactedIO, SecretsMasker, mask_secret, + merge, redact, reset_secrets_masker, should_hide_value_for_key, @@ -53,6 +54,7 @@ def lineno(): class MyEnum(str, Enum): testname = "testvalue" + testname2 = "testvalue2" @pytest.fixture @@ -729,3 +731,305 @@ def test_mixed_structured_unstructured_data(self): assert "***" in redacted_data["description"] assert redacted_data["nested"]["token"] == "***" assert redacted_data["nested"]["info"] == "No secrets here" + + +class TestSecretsMaskerMerge: + """Test the merge functionality for restoring original values from redacted data.""" + + @pytest.mark.parametrize( + ("new_value", "old_value", "name", "expected"), + [ + ("***", "original_secret", "password", "original_secret"), + ("new_secret", "original_secret", "password", "new_secret"), + ("***", "original_value", "normal_field", "***"), + ("new_value", "original_value", "normal_field", "new_value"), + ("***", "original_value", None, "***"), + ("new_value", "original_value", None, "new_value"), + ], + ) + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_simple_strings(self, new_value, old_value, name, expected): + result = merge(new_value, old_value, name) + assert result == expected + + @pytest.mark.parametrize( + ("old_data", "new_data", "expected"), + [ + ( + { + "password": "original_password", + "api_key": "original_api_key", + "normal_field": "original_normal", + }, + { + "password": "***", + "api_key": "new_api_key", + "normal_field": "new_normal", + }, + { + "password": "original_password", + "api_key": "new_api_key", + "normal_field": "new_normal", + }, + ), + ( + { + "config": {"password": "original_password", "host": "original_host"}, + "credentials": {"api_key": "original_api_key", "username": "original_user"}, + }, + { + "config": { + "password": "***", + "host": "new_host", + }, + "credentials": { + "api_key": "new_api_key", + "username": "new_user", + }, + }, + { + "config": { + "password": "original_password", + "host": "new_host", + }, + "credentials": { + "api_key": "new_api_key", + "username": "new_user", + }, + }, + ), + ], + ) + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_dictionaries(self, old_data, new_data, expected): + result = merge(new_data, old_data) + assert result == expected + + @pytest.mark.parametrize( + ("old_data", "new_data", "name", "expected"), + [ + # Lists + ( + ["original_item1", "original_item2", "original_item3"], + ["new_item1", "new_item2"], + None, + ["new_item1", "new_item2"], + ), + ( + ["original_item1", "original_item2"], + ["new_item1", "new_item2", "new_item3", "new_item4"], + None, + ["new_item1", "new_item2", "new_item3", "new_item4"], + ), + ( + ["secret1", "secret2", "secret3"], + ["***", "new_secret2", "***"], + "password", + ["secret1", "new_secret2", "secret3"], + ), + ( + ["value1", "value2", "value3"], + ["***", "new_value2", "***"], + "normal_list", + ["***", "new_value2", "***"], + ), + # Tuples + ( + ("original_item1", "original_item2", "original_item3"), + ("new_item1", "new_item2"), + None, + ("new_item1", "new_item2"), + ), + ( + ("original_item1", "original_item2"), + ("new_item1", "new_item2", "new_item3", "new_item4"), + None, + ("new_item1", "new_item2", "new_item3", "new_item4"), + ), + ( + ("secret1", "secret2", "secret3"), + ("***", "new_secret2", "***"), + "password", + ("secret1", "new_secret2", "secret3"), + ), + ( + ("value1", "value2", "value3"), + ("***", "new_value2", "***"), + "normal_tuple", + ("***", "new_value2", "***"), + ), + # Sets + ( + {"original_item1", "original_item2", "original_item3"}, + {"new_item1", "new_item2"}, + None, + {"new_item1", "new_item2"}, + ), + ( + {"original_item1", "original_item2"}, + {"new_item1", "new_item2", "new_item3", "new_item4"}, + None, + {"new_item1", "new_item2", "new_item3", "new_item4"}, + ), + ( + {"secret1", "secret2", "secret3"}, + {"***", "new_secret2", "***"}, + "password", + {"***", "new_secret2", "***"}, + ), + ( + {"value1", "value2", "value3"}, + {"***", "new_value2", "***"}, + "normal_tuple", + {"***", "new_value2", "***"}, + ), + ], + ) + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_collections(self, old_data, new_data, name, expected): + result = merge(new_data, old_data, name) + assert result == expected + + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_mismatched_types(self): + old_data = {"key": "value"} + new_data = "some_string" # Different type + + # When types don't match, prefer the new item + expected = "some_string" + + result = merge(new_data, old_data) + assert result == expected + + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_with_missing_keys(self): + old_data = {"password": "original_password", "old_only_key": "old_value", "common_key": "old_common"} + + new_data = { + "password": "***", + "new_only_key": "new_value", + "common_key": "new_common", + } + + expected = { + "password": "original_password", + "new_only_key": "new_value", + "common_key": "new_common", + } + + result = merge(new_data, old_data) + assert result == expected + + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_complex_redacted_structures(self): + old_data = { + "some_config": { + "nested_password": "original_nested_password", + "passwords": ["item1", "item2"], + }, + "normal_field": "normal_value", + } + + new_data = { + "some_config": {"nested_password": "***", "passwords": ["***", "new_item2"]}, + "normal_field": "new_normal_value", + } + + result = merge(new_data, old_data) + expected = { + "some_config": { + "nested_password": "original_nested_password", + "passwords": ["item1", "new_item2"], + }, + "normal_field": "new_normal_value", + } + assert result == expected + + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_partially_redacted_structures(self): + old_data = { + "config": { + "password": "original_password", + "host": "original_host", + "nested": {"api_key": "original_api_key", "timeout": 30}, + } + } + + new_data = { + "config": { + "password": "***", + "host": "new_host", + "nested": { + "api_key": "***", + "timeout": 60, + }, + } + } + + expected = { + "config": { + "password": "original_password", + "host": "new_host", + "nested": { + "api_key": "original_api_key", + "timeout": 60, + }, + } + } + + result = merge(new_data, old_data) + assert result == expected + + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_max_depth(self): + old_data = {"level1": {"level2": {"level3": {"password": "original_password"}}}} + new_data = {"level1": {"level2": {"level3": {"password": "***"}}}} + + result = merge(new_data, old_data, max_depth=1) + assert result == new_data + + result = merge(new_data, old_data, max_depth=10) + assert result["level1"]["level2"]["level3"]["password"] == "original_password" + + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_enum_values(self): + old_enum = MyEnum.testname + new_enum = MyEnum.testname2 + + result = merge(new_enum, old_enum) + assert result == new_enum + assert isinstance(result, MyEnum) + + @pytest.mark.usefixtures("patched_secrets_masker") + def test_merge_round_trip(self): + # Original data with sensitive information + original_config = { + "database": {"host": "db.example.com", "password": "super_secret_password", "username": "admin"}, + "api": {"api_key": "secret_api_key_12345", "endpoint": "https://api.example.com", "timeout": 30}, + "app_name": "my_application", + } + + # Step 1: Redact the original data + redacted_dict = redact(original_config) + + # Verify sensitive fields are redacted + assert redacted_dict["database"]["password"] == "***" + assert redacted_dict["api"]["api_key"] == "***" + assert redacted_dict["database"]["host"] == "db.example.com" + + # Step 2: User modifies some fields + updated_dict = redacted_dict.copy() + updated_dict["database"]["host"] = "new-db.example.com" + updated_dict["api"]["timeout"] = 60 + updated_dict["api"]["api_key"] = "new_api_key_67890" + # User left password as "***" (unchanged) + + # Step 3: Merge to restore unchanged sensitive values + final_dict = merge(updated_dict, original_config) + + # Verify the results + assert final_dict["database"]["password"] == "super_secret_password" # Restored + assert final_dict["database"]["host"] == "new-db.example.com" # User modification kept + assert final_dict["api"]["api_key"] == "new_api_key_67890" # User modification kept + assert final_dict["api"]["timeout"] == 60 # User modification kept + assert final_dict["app_name"] == "my_application" # Unchanged