Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
36d72c2
Move secrets_masker over to airflow_shared distribution
amoghrajesh Aug 13, 2025
d6317ce
better deps
amoghrajesh Aug 13, 2025
47dc00b
update in core
amoghrajesh Aug 13, 2025
105d96c
remove secondary link
amoghrajesh Aug 13, 2025
8c79de0
fixing task sdk tests
amoghrajesh Aug 13, 2025
4439cdc
fixing imports in core and providers
amoghrajesh Aug 13, 2025
89d23f6
fixing api tests
amoghrajesh Aug 14, 2025
707f660
fixing provider tests
amoghrajesh Aug 14, 2025
9e612ba
fixing api tests
amoghrajesh Aug 14, 2025
b8d6d3b
fixing k8s tests
amoghrajesh Aug 14, 2025
bd992d5
fixing core tests
amoghrajesh Aug 14, 2025
c90351e
making the secrets masker a global instance
amoghrajesh Aug 18, 2025
43fa3d6
patch the right target
amoghrajesh Aug 18, 2025
5fa664b
fixing cncf tests
amoghrajesh Aug 18, 2025
1057802
udpating filters with correct paths
amoghrajesh Aug 18, 2025
65a540c
Merge branch 'main' into move-secrets-masker-to-shared
amoghrajesh Aug 18, 2025
f1576d4
Merge branch 'main' into move-secrets-masker-to-shared
amoghrajesh Aug 18, 2025
2eb4147
attempt to fix tests
amoghrajesh Aug 18, 2025
db0eed3
trying to fix the tests
amoghrajesh Aug 18, 2025
d3a75e0
core tests
amoghrajesh Aug 18, 2025
b3a5232
Merge branch 'main' into move-secrets-masker-to-shared
amoghrajesh Aug 19, 2025
15ca378
use task sdk secrets masker
amoghrajesh Aug 19, 2025
6aa3456
fix the fixture
amoghrajesh Aug 19, 2025
140c274
sdk masker in conf too
amoghrajesh Aug 19, 2025
3b0b346
remove reset in dag processor
amoghrajesh Aug 19, 2025
90669c0
fixing cli test
amoghrajesh Aug 19, 2025
47e1ae2
correcting fixture
amoghrajesh Aug 19, 2025
9a02776
making the import pattern better
amoghrajesh Aug 19, 2025
b52b964
fixing docstring
amoghrajesh Aug 19, 2025
01ce913
consistent usage
amoghrajesh Aug 19, 2025
977f580
fixing core test
amoghrajesh Aug 19, 2025
73c7818
Merge branch 'main' into move-secrets-masker-to-shared
amoghrajesh Aug 19, 2025
df2b467
fixing sdk tests
amoghrajesh Aug 19, 2025
4946c5e
Merge branch 'main' into move-secrets-masker-to-shared
amoghrajesh Aug 20, 2025
ac4a857
re ref from shared not sdk
amoghrajesh Aug 20, 2025
281861a
fixing tests
amoghrajesh Aug 20, 2025
6f0250a
mask secret on getting var
amoghrajesh Aug 20, 2025
6cac064
trying to remove dependency on settings in tests
amoghrajesh Aug 20, 2025
32161a0
trying to remove dependency on settings in tests
amoghrajesh Aug 20, 2025
c4ec151
Merge branch 'main' into move-secrets-masker-to-shared
amoghrajesh Aug 21, 2025
4999d9a
adding shared dep to core and updating script to handle duplicate deps
amoghrajesh Aug 21, 2025
23865ee
fixing task sdk tests
amoghrajesh Aug 21, 2025
1abbdc3
ah the relative import in use
amoghrajesh Aug 21, 2025
3283c6a
normalising the name of the library
amoghrajesh Aug 21, 2025
1029e1c
better way of log masking setting
amoghrajesh Aug 21, 2025
011f359
better way of log masking setting
amoghrajesh Aug 21, 2025
8880440
Merge branch 'main' into move-secrets-masker-to-shared
amoghrajesh Aug 22, 2025
2c81740
review 1 from kaxil
amoghrajesh Aug 22, 2025
4a87939
removing masking from processor
amoghrajesh Aug 22, 2025
dcbe01b
public variable
amoghrajesh Aug 22, 2025
635b21d
fixing pyprojects
amoghrajesh Aug 22, 2025
8638f1b
adding workflow for shared masker
amoghrajesh Aug 22, 2025
7853f62
:facepalm
amoghrajesh Aug 22, 2025
2414958
fix runner
amoghrajesh Aug 22, 2025
e3025fd
correct name of library with normalize changes
amoghrajesh Aug 22, 2025
cb78a2a
wrong name
amoghrajesh Aug 22, 2025
4b28b83
cleaning code up
amoghrajesh Aug 22, 2025
632db50
add pydantic as dependency
amoghrajesh Aug 22, 2025
b399d45
fixing tests
amoghrajesh Aug 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ jobs:
uses: ./.github/actions/breeze
- run: uv tool run --from apache-airflow-breeze pytest -n auto --color=yes
working-directory: ./dev/breeze/
tests-shared-secrets-masker:
timeout-minutes: 10
name: Shared secrets masker tests
runs-on: ${{ fromJSON(inputs.runners) }}
steps:
- name: "Cleanup repo"
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
persist-credentials: false
- name: "Prepare and cleanup runner"
run: ./scripts/ci/prepare_and_cleanup_runner.sh
- name: "Install Breeze"
uses: ./.github/actions/breeze
- name: "Run shared secrets masker tests"
run: uv run --group dev pytest --color=yes
working-directory: ./shared/secrets_masker/
tests-ui:
timeout-minutes: 15
name: React UI tests
Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,7 @@ repos:
^airflow-core/src/airflow/settings\.py$|
^airflow-core/src/airflow/models/renderedtifields\.py$|
^airflow-core/src/airflow/serialization/helpers\.py$|
^airflow-core/src/airflow/models/expandinput\.py$
^airflow-core/src/airflow/models/expandinput\.py$|
^airflow-core/src/airflow/cli/commands/triggerer_command.py$
additional_dependencies: ['rich>=12.4.4']
## ONLY ADD PREK HOOKS HERE THAT REQUIRE CI IMAGE
7 changes: 7 additions & 0 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ exclude = [

[tool.hatch.build.targets.sdist.force-include]
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"

[tool.hatch.build.targets.custom]
path = "./hatch_build.py"
Expand Down Expand Up @@ -276,3 +277,9 @@ required-version = ">=0.6.3"
[tool.uv.sources]
apache-airflow-core = {workspace = true}
apache-airflow-devel-common = { workspace = true }

[tool.airflow]
shared_distributions = [
"apache-airflow-shared-timezones",
"apache-airflow-shared-secrets-masker",
]
1 change: 1 addition & 0 deletions airflow-core/src/airflow/_shared/secrets_masker
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from pydantic import AliasPath, Field, NonNegativeInt, field_validator

from airflow._shared.secrets_masker import redact
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.sdk.execution_time.secrets_masker import redact


class DagScheduleAssetReference(StrictBaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo

from airflow._shared.secrets_masker import redact
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.sdk.execution_time.secrets_masker import redact


# Response Models
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

from pydantic import Field, JsonValue, model_validator

from airflow._shared.secrets_masker import redact
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.models.base import ID_LEN
from airflow.sdk.execution_time.secrets_masker import redact
from airflow.typing_compat import Self


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pydantic import ValidationError
from sqlalchemy import select

from airflow._shared.secrets_masker import merge
from airflow.api_fastapi.core_api.datamodels.common import (
BulkActionNotOnExistence,
BulkActionOnExistence,
Expand All @@ -34,7 +35,6 @@
from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.models.connection import Connection
from airflow.sdk.execution_time.secrets_masker import merge


def update_orm_from_pydantic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
from fastapi import Request
from pendulum.parsing.exceptions import ParserError

from airflow._shared.secrets_masker import secrets_masker
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.core_api.security import GetUserDep
from airflow.models import Log
from airflow.sdk.execution_time import secrets_masker

logger = logging.getLogger(__name__)

Expand Down
8 changes: 6 additions & 2 deletions airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from airflow.models.dagrun import DagRun
from airflow.sdk.definitions.dag import DAG, _run_task
from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.execution_time.secrets_masker import RedactedIO
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
Expand Down Expand Up @@ -370,7 +369,9 @@ def task_test(args, dag: DAG | None = None) -> None:
# airflow.task would redirect to a file, but here we want it to propagate
# up to the normal airflow handler.

settings.MASK_SECRETS_IN_LOGS = True
from airflow.sdk._shared.secrets_masker import SecretsMasker

SecretsMasker.enable_log_masking()

handlers = logging.getLogger("airflow.task").handlers
already_has_stream_handler = False
Expand Down Expand Up @@ -404,6 +405,9 @@ def task_test(args, dag: DAG | None = None) -> None:
task, args.map_index, logical_date_or_run_id=args.logical_date_or_run_id, create_if_necessary="db"
)
try:
# TODO: move bulk of this logic into the SDK: http://github.com/apache/airflow/issues/54658
from airflow.sdk._shared.secrets_masker import RedactedIO

with redirect_stdout(RedactedIO()):
_run_task(ti=ti, task=task, run_triggerer=True)
if ti.state == State.FAILED and args.post_mortem:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ def triggerer_run(skip_serve_logs: bool, capacity: int, triggerer_heartrate: flo
@providers_configuration_loaded
def triggerer(args):
"""Start Airflow Triggerer."""
settings.MASK_SECRETS_IN_LOGS = True
from airflow.sdk._shared.secrets_masker import SecretsMasker

SecretsMasker.enable_log_masking()

print(settings.HEADER)
triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@
},
},
"filters": {
"mask_secrets": {
"()": "airflow.sdk.execution_time.secrets_masker.SecretsMasker",
"mask_secrets_core": {
"()": "airflow._shared.secrets_masker._secrets_masker",
},
},
"handlers": {
"console": {
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"formatter": "airflow_coloured",
"stream": "sys.stdout",
"filters": ["mask_secrets"],
"filters": ["mask_secrets_core"],
},
"task": {
"class": "airflow.utils.log.file_task_handler.FileTaskHandler",
"formatter": "airflow",
"base_log_folder": BASE_LOG_FOLDER,
"filters": ["mask_secrets"],
"filters": ["mask_secrets_core"],
},
},
"loggers": {
Expand All @@ -95,7 +95,7 @@
"level": LOG_LEVEL,
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
"propagate": True,
"filters": ["mask_secrets"],
"filters": ["mask_secrets_core"],
},
"flask_appbuilder": {
"handlers": ["console"],
Expand All @@ -106,7 +106,7 @@
"root": {
"handlers": ["console"],
"level": LOG_LEVEL,
"filters": ["mask_secrets"],
"filters": ["mask_secrets_core"],
},
}

Expand Down
6 changes: 4 additions & 2 deletions airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,8 @@ def _create_future_warning(name: str, section: str, current_value: Any, new_valu
)

def mask_secrets(self):
from airflow.sdk.execution_time.secrets_masker import mask_secret
from airflow._shared.secrets_masker import mask_secret as mask_secret_core
from airflow.sdk.log import mask_secret as mask_secret_sdk

for section, key in self.sensitive_config_values:
try:
Expand All @@ -873,7 +874,8 @@ def mask_secrets(self):
key,
)
continue
mask_secret(value)
mask_secret_core(value)
mask_secret_sdk(value)

def _env_var_name(self, section: str, key: str) -> str:
return f"{ENV_VAR_PREFIX}{section.replace('.', '_').upper()}__{key.upper()}"
Expand Down
4 changes: 0 additions & 4 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ def run(self):
By processing them in separate processes, we can get parallelism and isolation
from potentially harmful user code.
"""
from airflow.sdk.execution_time.secrets_masker import reset_secrets_masker

reset_secrets_masker()

self.register_exit_signals()

self.log.info("Processing files using up to %s processes at a time ", self._parallelism)
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int
resp = dagrun_result
dump_opts = {"exclude_unset": True}
elif isinstance(msg, MaskSecret):
from airflow.sdk.execution_time.secrets_masker import mask_secret
# Use sdk masker in dag processor and triggerer because those use the task sdk machinery
from airflow.sdk.log import mask_secret

mask_secret(msg.value, msg.name)
else:
Expand Down
11 changes: 6 additions & 5 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r
elif isinstance(msg, GetVariable):
var = self.client.variables.get(msg.key)
if isinstance(var, VariableResponse):
# TODO: call for help to figure out why this is needed
if var.value:
from airflow.sdk.log import mask_secret

mask_secret(var.value, var.key)
var_result = VariableResult.from_variable_response(var)
resp = var_result
dump_opts = {"exclude_unset": True}
Expand Down Expand Up @@ -504,7 +509,7 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r
api_resp = self.client.hitl.get_detail_response(ti_id=msg.ti_id)
resp = HITLDetailResponseResult.from_api_response(response=api_resp)
elif isinstance(msg, MaskSecret):
from airflow.sdk.execution_time.secrets_masker import mask_secret
from airflow.sdk.log import mask_secret

mask_secret(msg.value, msg.name)
else:
Expand All @@ -514,10 +519,6 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r

def run(self) -> None:
"""Run synchronously and handle all database reads/writes."""
from airflow.sdk.execution_time.secrets_masker import reset_secrets_masker

reset_secrets_masker()

while not self.stop:
if not self.is_alive():
log.error("Trigger runner process has died! Exiting.")
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
from sqlalchemy import Boolean, Column, Integer, String, Text
from sqlalchemy.orm import declared_attr, reconstructor, synonym

from airflow._shared.secrets_masker import mask_secret
from airflow.configuration import ensure_secrets_loaded
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
from airflow.sdk import SecretCache
from airflow.sdk.execution_time.secrets_masker import mask_secret
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __repr__(self):
return prefix + ">"

def _redact(self):
from airflow.sdk.execution_time.secrets_masker import redact
from airflow._shared.secrets_masker import redact

if self.k8s_pod_yaml:
self.k8s_pod_yaml = redact(self.k8s_pod_yaml)
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import declared_attr, reconstructor, synonym

from airflow._shared.secrets_masker import mask_secret
from airflow.configuration import ensure_secrets_loaded
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
from airflow.sdk import SecretCache
from airflow.sdk.execution_time.secrets_masker import mask_secret
from airflow.secrets.metastore import MetastoreBackend
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/serialization/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

from typing import Any

from airflow._shared.secrets_masker import redact
from airflow.configuration import conf
from airflow.sdk.execution_time.secrets_masker import redact
from airflow.settings import json


Expand Down
6 changes: 1 addition & 5 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def _configure_async_session() -> None:

def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy."""
from airflow.sdk.execution_time.secrets_masker import mask_secret
from airflow._shared.secrets_masker import mask_secret

if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
from airflow.exceptions import AirflowConfigException
Expand Down Expand Up @@ -717,10 +717,6 @@ def initialize():

HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")

# By default this is off, but is automatically configured on when running task
# instances
MASK_SECRETS_IN_LOGS = False

# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
from typing import TYPE_CHECKING, TypeVar, cast

from airflow import settings
from airflow._shared.secrets_masker import should_hide_value_for_key
from airflow._shared.timezones import timezone
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.exceptions import AirflowException
from airflow.sdk.definitions._internal.dag_parsing_context import _airflow_parsing_context_manager
from airflow.sdk.execution_time.secrets_masker import should_hide_value_for_key
from airflow.utils import cli_action_loggers
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.platform import getuser, is_terminal_support_colors
Expand Down
6 changes: 5 additions & 1 deletion airflow-core/tests/unit/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,12 @@ def test_test_filters_secrets(self, capsys):

Output should be filtered by SecretsMasker.
"""
# TODO: revisit during https://github.com/apache/airflow/issues/54658
from airflow.sdk.log import mask_secret

password = "somepassword1234!"
logging.getLogger("airflow.task").filters[0].add_mask(password)
mask_secret(password)

args = self.parser.parse_args(
["tasks", "test", "example_python_operator", "print_the_context", "2018-01-01"],
)
Expand Down
22 changes: 10 additions & 12 deletions airflow-core/tests/unit/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1891,21 +1891,19 @@ def test_config_paths_is_directory(self):
new_callable=lambda: [("mysection1", "mykey1"), ("mysection2", "mykey2")],
)
def test_mask_conf_values(self, mock_sensitive_config_values):
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

target = (
"airflow.sdk.execution_time.secrets_masker.mask_secret"
if AIRFLOW_V_3_0_PLUS
else "airflow.utils.log.secrets_masker.mask_secret"
)

with patch(target) as mock_mask_secret:
with (
patch("airflow._shared.secrets_masker.mask_secret") as mock_mask_secret_core,
patch("airflow.sdk.log.mask_secret") as mock_mask_secret_sdk,
):
conf.mask_secrets()

mock_mask_secret.assert_any_call("supersecret1")
mock_mask_secret.assert_any_call("supersecret2")
mock_mask_secret_core.assert_any_call("supersecret1")
mock_mask_secret_core.assert_any_call("supersecret2")
assert mock_mask_secret_core.call_count == 2

assert mock_mask_secret.call_count == 2
mock_mask_secret_sdk.assert_any_call("supersecret1")
mock_mask_secret_sdk.assert_any_call("supersecret2")
assert mock_mask_secret_sdk.call_count == 2


@conf_vars({("core", "unit_test_mode"): "False"})
Expand Down
Loading
Loading