Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Aug 13, 2025

closes: #54397

Why

As part of #51545, we are trying to avoid code duplication between airflow-core and task-sdk. The secrets masking functionality is widely used so that it can be moved to a shared library. This follows the pattern set by the timezone utilities migration(#53149) and ensures a single source of truth for secrets masking logic across Airflow codebase.

High level changes made

1. Created shared library structure

  • New directory: shared/secrets_masker/ with standard shared library layout
  • Project configuration: shared/secrets_masker/pyproject.toml defining apache-airflow-shared-secrets_masker
  • Source code: Moved from task-sdk/src/airflow/sdk/execution_time/secrets_masker.py to shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py

2. Updated consuming packages

  • Workspace configuration: Added shared/secrets_masker to root pyproject.toml workspace members
  • Public API modules: Created task-sdk/src/airflow/sdk/secrets_masker.py that re-exports shared functionality
  • Import statements: Updated imports across airflow-core and task-sdk to use new shared paths

4. How the updates were made:

  • Pre-commit hooks: Leveraged existing check-shared-distributions-structure and check-shared-distributions-usage hooks
  • Symlinks: Automatic creation of symlinks from airflow/_shared/secrets_masker and airflow/sdk/_shared/secrets_masker
  • Force-include: Automated addition to pyproject.toml files for wheel packaging

Sanity tests

Symlink verification

# Verify symlinks exist and point to correct locations
(airflow) ➜  airflow git:(move-secrets-masker-to-shared) ✗ ls -la airflow-core/src/airflow/_shared/secrets_masker
ls -la task-sdk/src/airflow/sdk/_shared/secrets_masker
lrwxr-xr-x@ 1 amoghdesai  staff  67 Aug 13 15:59 airflow-core/src/airflow/_shared/secrets_masker -> ../../../../shared/secrets_masker/src/airflow_shared/secrets_masker
lrwxr-xr-x@ 1 amoghdesai  staff  70 Aug 13 15:59 task-sdk/src/airflow/sdk/_shared/secrets_masker -> ../../../../../shared/secrets_masker/src/airflow_shared/secrets_masker


# Check symlink targets
(airflow) ➜  airflow git:(move-secrets-masker-to-shared) ✗ readlink airflow-core/src/airflow/_shared/secrets_masker
readlink task-sdk/src/airflow/sdk/_shared/secrets_masker
../../../../shared/secrets_masker/src/airflow_shared/secrets_masker
../../../../../shared/secrets_masker/src/airflow_shared/secrets_masker

Shared library test in isolation

# Test shared library works independently
(airflow) ➜  airflow git:(move-secrets-masker-to-shared) ✗ cd shared/secrets_masker
PYTHONPATH=src python -c "
from airflow_shared.secrets_masker.secrets_masker import SecretsMasker
filt = SecretsMasker()
filt.add_mask('password')
print('✅ Shared library works in isolation')
"
✅ Shared library works in isolation

Import validation

# Test imports work from consuming packages
(airflow) ➜  secrets_masker git:(move-secrets-masker-to-shared) ✗python -c "from airflow.sdk.secrets_masker import SecretsMasker; print('✅ task-sdk import works')""
python -c "from airflow._shared.secrets_masker.secrets_masker import SecretsMasker; print('✅ airflow-core import works')"
✅ task-sdk import works
✅ airflow-core import works

Some tests for peace of mind

DAG used for testing some cases:

from __future__ import annotations

import logging

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable, Connection


def my_function() -> None:
    conn = Connection.get("my_connection")
    logging.getLogger(__name__).info(conn.password)
    print("via print", conn.password)


with DAG("subprocess_dag") as dag:
    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(task_id="py_func", python_callable=my_function)

    end = EmptyOperator(task_id="end")

    start >> py_func >> end

Env set:

export AIRFLOW_CONN_MY_CONNECTION="mysql://testuser:testpassword123@localhost:3306/testdb
  1. Running a task through CLI: airflow tasks test subprocess_dag py_fu
[2025-08-21T07:52:11.216+0000] {dag.py:2329} INFO - created dagrun <DagRun subprocess_dag @ None: __airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__, state:running, queued_at: None. run_type: manual>
[2025-08-21T07:52:11.222+0000] {dag.py:1293} INFO - [DAG TEST] starting task_id=py_func map_index=-1
[2025-08-21T07:52:11.680+0000] {dag.py:1296} INFO - [DAG TEST] running task <TaskInstance: subprocess_dag.py_func __airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__ [None]>
2025-08-21 07:52:12 [debug    ] Starting task instance run     hostname=e2cc5b7dc023 pid=450 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 unixname=root
2025-08-21 07:52:12 [debug    ] Retrieved task instance details dag_id=subprocess_dag state=queued task_id=py_func ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [info     ] Task started                   hostname=e2cc5b7dc023 previous_state=queued ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [info     ] Task instance state updated    rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.497+0000] {_client.py:1025} INFO - HTTP Request: PATCH http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/run "HTTP/1.1 200 OK"
2025-08-21 07:52:12 [debug    ] Sending request                msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields')
2025-08-21 07:52:12 [debug    ] Received message from task runner [supervisor] msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields')
2025-08-21 07:52:12 [info     ] Updating RenderedTaskInstanceFields field_count=3 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [debug    ] RenderedTaskInstanceFields updated successfully ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.509+0000] {_client.py:1025} INFO - HTTP Request: PUT http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/rtif "HTTP/1.1 201 Created"
2025-08-21 07:52:12 [debug    ] Sending request                msg=MaskSecret(value='***', name=None, type='MaskSecret')
2025-08-21 07:52:12 [debug    ] Received message from task runner (body omitted) [supervisor] msg=<class 'airflow.sdk.execution_time.comms.MaskSecret'>
[2025-08-21T07:52:12.513+0000] {subprocess_mask_var.py:13} INFO - ***
via print ***
[2025-08-21T07:52:12.513+0000] {python.py:218} INFO - Done. Returned value was: None
2025-08-21 07:52:12 [debug    ] Sending request                msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[], rendered_map_index=None, type='SucceedTask')
2025-08-21 07:52:12 [debug    ] Received message from task runner [supervisor] msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[], rendered_map_index=None, type='SucceedTask')
2025-08-21 07:52:12 [debug    ] Updating task instance state   new_state=success ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [debug    ] Retrieved current task instance state max_tries=0 previous_state=running ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022 try_number=0
2025-08-21 07:52:12 [info     ] Task instance state updated    new_state=success rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.522+0000] {_client.py:1025} INFO - HTTP Request: PATCH http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/state "HTTP/1.1 204 No Content"
2025-08-21 07:52:12 [debug    ] Running finalizers             [task] ti=RuntimeTaskInstance(id=UUID('0198cb9d-28d2-7800-82b0-7161dc72f022'), task_id='py_func', dag_id='subprocess_dag', run_id='__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__', try_number=0, dag_version_id=UUID('0198cb9b-d3d6-74f3-a561-083d897ed8cd'), map_index=-1, hostname='e2cc5b7dc023', context_carrier=None, task=<Task(PythonOperator): py_func>, max_tries=0, start_date=datetime.datetime(2025, 8, 21, 7, 52, 11, 880733, tzinfo=datetime.timezone.utc), end_date=datetime.datetime(2025, 8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), state=<TaskInstanceState.SUCCESS: 'success'>, is_mapped=False, rendered_map_index=None, log_url=None)

This is specifically to test the recent issue: 1f4c55c

  1. Running a trigger with a dag that will log using dag processor, root logger, structlog etc

Trigger:

from __future__ import annotations

import asyncio
import logging

import structlog

from airflow.sdk import Connection, Variable
from airflow.sdk.log import mask_secret
from airflow.triggers.base import BaseTrigger, TriggerEvent


print(f"{__file__=} loaded")


class CustomTrigger(BaseTrigger):
    async def run(self, **args):
        from asgiref.sync import sync_to_async

        await sync_to_async(Variable.set)("my_api_key", "password1")
        x = await sync_to_async(Variable.get)("my_api_key")

        logging.getLogger(__name__).info("my_api_key=%s", x)
        secret = "some-secret-value"
        await sync_to_async(mask_secret)(secret)
        await sync_to_async(mask_secret)("some-secret-value")

        logging.getLogger(__name__).info("after manual mask %s", secret)

        await structlog.get_logger().ainfo("Testing structlog", val=secret, api_key="abcdef", x=x)

        yield TriggerEvent({"Hi": "from trigger"})

    def serialize(self):
        return (
            f"{type(self).__module__}.{type(self).__qualname__}",
            {},
        )

DAG:

from airflow.exceptions import AirflowRescheduleException, TaskDeferred
from airflow.sdk import dag, task
from airflow.sdk import Variable
import logging

x = Variable.get("toplevel_api_key", default="secret_api")
print(f"{x=}")
logging.root.info("toplevel=%s", x)

@dag()
def trigger_a_gag():
    @task
    def trigger(event=None) -> None:
        if event:
            print(event)
        else:
            from triggera import CustomTrigger
            raise TaskDeferred(trigger=CustomTrigger(), method_name="execute")

    trigger()


trigger_a_gag()

image

Logs:

[2025-08-21, 13:24:55] INFO - __file__='/files/plugins/triggera.py' loaded: chan="stdout": source="task"
[2025-08-21, 13:24:55] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-21, 13:24:55] INFO - Filling up the DagBag from /files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
[2025-08-21, 13:24:55] INFO - toplevel=***: source="root"
[2025-08-21, 13:24:55] INFO - x='***': chan="stdout": source="task"
[2025-08-21, 13:24:55] INFO - Pausing task as DEFERRED. : dag_id="trigger_a_gag": task_id="trigger": run_id="manual__2025-08-21T07:54:54.321189+00:00": source="task"
[2025-08-21, 13:24:57] INFO - trigger trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1) starting
[2025-08-21, 13:24:59] INFO - my_api_key=***: source="triggera"
[2025-08-21, 13:24:59] INFO - after manual mask ***: source="triggera"
[2025-08-21, 13:24:59] INFO - Testing structlog: val="***": api_key="***": x="***"
[2025-08-21, 13:24:59] INFO - Trigger fired event: name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1)": result="TriggerEvent<{'Hi': 'from trigger'}>"
[2025-08-21, 13:24:59] INFO - trigger completed: name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1)"
[2025-08-21, 13:25:00] INFO - __file__='/files/plugins/triggera.py' loaded: chan="stdout": source="task"
[2025-08-21, 13:25:00] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-21, 13:25:00] INFO - Filling up the DagBag from /files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
[2025-08-21, 13:25:00] INFO - toplevel=***: source="root"
[2025-08-21, 13:25:00] INFO - x='***': chan="stdout": source="task"
[2025-08-21, 13:25:00] ERROR - Task failed with exception: source="task"

TODO next

  • Update occurences of these imports in providers
  • Update these occurrences in docs if any

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh
Copy link
Contributor Author

@kaxil / @potiuk thanks for your reviews, I have handled all / most of your comments, could you take a look when possible?

@potiuk added the CI job for this too

@amoghrajesh
Copy link
Contributor Author

@amoghrajesh amoghrajesh requested review from kaxil and potiuk August 22, 2025 13:41
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonderful!

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments to be addressed but high-level is fine

@amoghrajesh
Copy link
Contributor Author

Good lord, that was a big one with changes in many areas including precommits etc! Thanks for your reviews @potiuk and @kaxil! The tooling is awesome (even more so with the improvements now) for shared library bootstrapping! Merging this and will be handling the open comments in #54859 and resolve it soon

@amoghrajesh amoghrajesh merged commit 7982542 into apache:main Aug 23, 2025
194 checks passed
@amoghrajesh amoghrajesh deleted the move-secrets-masker-to-shared branch August 23, 2025 05:18
@potiuk
Copy link
Member

potiuk commented Aug 23, 2025

Good lord, that was a big one with changes in many areas including precommits etc! Thanks for your reviews @potiuk and @kaxil! The tooling is awesome (even more so with the improvements now) for shared library bootstrapping! Merging this and will be handling the open comments in #54859 and resolve it soon

Indeed. I am very happy with the tooling :). And with #54863 we have small common test framework for the shared distributions as well :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Create a shared library for secrets_masker and move the module there

4 participants