Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ repos:
language: python
pass_filenames: false
files: ^shared/.*$|^.*/pyproject.toml$|^.*/_shared/.*$
- id: check-secrets-search-path-sync
name: Check sync between sdk and core
entry: ./scripts/ci/prek/check_secrets_search_path_sync.py
language: python
pass_filenames: false
files: ^airflow-core/src/airflow/secrets/base_secrets\.py$|^task-sdk/src/airflow/sdk/execution_time/secrets/__init__\.py$
- id: ruff
name: Run 'ruff' for extremely fast Python linting
description: "Run 'ruff' for extremely fast Python linting"
Expand Down
11 changes: 4 additions & 7 deletions airflow-core/src/airflow/secrets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@

__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH"]

from airflow.secrets.base_secrets import BaseSecretsBackend

DEFAULT_SECRETS_SEARCH_PATH = [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
"airflow.secrets.metastore.MetastoreBackend",
]

from airflow.secrets.base_secrets import (
DEFAULT_SECRETS_SEARCH_PATH as DEFAULT_SECRETS_SEARCH_PATH,
BaseSecretsBackend,
)

__deprecated_classes = {
"cache": {
Expand Down
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/secrets/base_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@

# Re export for compat
from airflow._shared.secrets_backend.base import BaseSecretsBackend as BaseSecretsBackend

# Server side default secrets backend search path used by server components (scheduler, API server)
DEFAULT_SECRETS_SEARCH_PATH = [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
"airflow.secrets.metastore.MetastoreBackend",
]
89 changes: 89 additions & 0 deletions scripts/ci/prek/check_secrets_search_path_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import ast
import sys
from pathlib import Path

AIRFLOW_ROOT = Path(__file__).parents[3].resolve()
CORE_SECRETS_FILE = AIRFLOW_ROOT / "airflow-core" / "src" / "airflow" / "secrets" / "base_secrets.py"
SDK_SECRETS_FILE = (
AIRFLOW_ROOT / "task-sdk" / "src" / "airflow" / "sdk" / "execution_time" / "secrets" / "__init__.py"
)


def extract_from_file(file_path: Path, constant_name: str) -> list[str] | None:
"""Extract a list constant value from a Python file using AST parsing."""
try:
with open(file_path) as f:
tree = ast.parse(f.read(), filename=str(file_path))

for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == constant_name:
if isinstance(node.value, ast.List):
values = []
for elt in node.value.elts:
if isinstance(elt, ast.Constant):
values.append(elt.value)
return values
return None
except Exception as e:
print(f"Error parsing {file_path}: {e}", file=sys.stderr)
return None


def main() -> None:
# Extract DEFAULT_SECRETS_SEARCH_PATH from airflow-core
core_path = extract_from_file(CORE_SECRETS_FILE, "DEFAULT_SECRETS_SEARCH_PATH")
if core_path is None:
print(
f"ERROR: Could not extract DEFAULT_SECRETS_SEARCH_PATH from {CORE_SECRETS_FILE}",
file=sys.stderr,
)
sys.exit(1)

# Extract _SERVER_DEFAULT_SECRETS_SEARCH_PATH from task-sdk
sdk_path = extract_from_file(SDK_SECRETS_FILE, "_SERVER_DEFAULT_SECRETS_SEARCH_PATH")
if sdk_path is None:
print(
f"ERROR: Could not extract _SERVER_DEFAULT_SECRETS_SEARCH_PATH from {SDK_SECRETS_FILE}",
file=sys.stderr,
)
sys.exit(1)

if core_path == sdk_path:
sys.exit(0)
else:
print("\nERROR: Secrets search paths are not synchronized!", file=sys.stderr)
print(
"\nThe DEFAULT_SECRETS_SEARCH_PATH in airflow-core and "
"_SERVER_DEFAULT_SECRETS_SEARCH_PATH in task-sdk must match.",
file=sys.stderr,
)
print("\nPlease update either:", file=sys.stderr)
print(f" - {CORE_SECRETS_FILE}", file=sys.stderr)
print(f" - {SDK_SECRETS_FILE}", file=sys.stderr)
sys.exit(1)


if __name__ == "__main__":
main()
15 changes: 6 additions & 9 deletions task-sdk/src/airflow/sdk/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.sdk import yaml
from airflow.sdk._shared.configuration.parser import AirflowConfigParser as _SharedAirflowConfigParser
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
from airflow.sdk.execution_time.secrets import _SERVER_DEFAULT_SECRETS_SEARCH_PATH

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -185,7 +185,7 @@ def get_custom_secret_backend(worker_mode: bool = False):


def initialize_secrets_backends(
default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
default_backends: list[str] = _SERVER_DEFAULT_SECRETS_SEARCH_PATH,
):
"""
Initialize secrets backend.
Expand All @@ -201,10 +201,7 @@ def initialize_secrets_backends(
worker_mode = False
# Determine worker mode - if default_backends is not the server default, it's worker mode
# This is a simplified check; in practice, worker mode is determined by the caller
if default_backends != [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
"airflow.secrets.metastore.MetastoreBackend",
]:
if default_backends != _SERVER_DEFAULT_SECRETS_SEARCH_PATH:
worker_mode = True

custom_secret_backend = get_custom_secret_backend(worker_mode)
Expand All @@ -220,7 +217,7 @@ def initialize_secrets_backends(


def ensure_secrets_loaded(
default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH,
default_backends: list[str] = _SERVER_DEFAULT_SECRETS_SEARCH_PATH,
) -> list:
"""
Ensure that all secrets backends are loaded.
Expand All @@ -230,9 +227,9 @@ def ensure_secrets_loaded(
# Check if the secrets_backend_list contains only 2 default backends.

# Check if we are loading the backends for worker too by checking if the default_backends is equal
# to DEFAULT_SECRETS_SEARCH_PATH.
# to _SERVER_DEFAULT_SECRETS_SEARCH_PATH.
secrets_backend_list = initialize_secrets_backends()
if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH:
if len(secrets_backend_list) == 2 or default_backends != _SERVER_DEFAULT_SECRETS_SEARCH_PATH:
return initialize_secrets_backends(default_backends=default_backends)
return secrets_backend_list

Expand Down
7 changes: 7 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/secrets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@

__all__ = ["ExecutionAPISecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]

# Server-side default secrets search path (for comparison/detection only)
# This matches what airflow-core uses but is defined here to avoid importing from core
_SERVER_DEFAULT_SECRETS_SEARCH_PATH = [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
"airflow.secrets.metastore.MetastoreBackend",
]

DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
Expand Down
6 changes: 3 additions & 3 deletions task-sdk/tests/task_sdk/execution_time/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,12 +910,12 @@ def test_execution_api_backend_in_worker_chain(self):

def test_metastore_backend_in_server_chain(self):
"""Test that MetastoreBackend is in the API server search path."""
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH
from airflow.sdk.execution_time.secrets import _SERVER_DEFAULT_SECRETS_SEARCH_PATH

assert "airflow.secrets.metastore.MetastoreBackend" in DEFAULT_SECRETS_SEARCH_PATH
assert "airflow.secrets.metastore.MetastoreBackend" in _SERVER_DEFAULT_SECRETS_SEARCH_PATH
assert (
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend"
not in DEFAULT_SECRETS_SEARCH_PATH
not in _SERVER_DEFAULT_SECRETS_SEARCH_PATH
)

def test_get_connection_uses_backend_chain(self, mock_supervisor_comms):
Expand Down