Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,7 @@ repos:
^airflow-core/src/airflow/operators/subdag\.py$|
^airflow-core/src/airflow/plugins_manager\.py$|
^airflow-core/src/airflow/providers_manager\.py$|
^airflow-core/src/airflow/secrets/__init__.py$|
^airflow-core/src/airflow/serialization/definitions/[_a-z]+\.py$|
^airflow-core/src/airflow/serialization/enums\.py$|
^airflow-core/src/airflow/serialization/helpers\.py$|
Expand Down
49 changes: 49 additions & 0 deletions airflow-core/newsfragments/56583.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
Fix Connection & Variable access in API server contexts (plugins, log handlers)

Previously, hooks used in API server contexts (plugins, middlewares, log handlers) would fail with an ``ImportError``
for ``SUPERVISOR_COMMS``, because ``SUPERVISOR_COMMS`` only exists in task runner child processes.

This has been fixed by implementing automatic context detection with three separate secrets backend chains:

**Context Detection:**

1. **Client contexts** (task runner in worker): Detected via ``SUPERVISOR_COMMS`` presence
2. **Server contexts** (API server, scheduler): Explicitly marked with ``_AIRFLOW_PROCESS_CONTEXT=server`` environment variable
3. **Fallback contexts** (supervisor, unknown contexts): Neither marker present, uses minimal safe chain

**Backend Chains:**

- **Client**: ``EnvironmentVariablesBackend`` → ``ExecutionAPISecretsBackend`` (routes to Execution API via SUPERVISOR_COMMS)
- **Server**: ``EnvironmentVariablesBackend`` → ``MetastoreBackend`` (direct database access)
- **Fallback**: ``EnvironmentVariablesBackend`` only (+ external backends from config like AWS Secrets Manager, Vault)

The fallback chain is crucial for supervisor processes (worker-side, before task runner starts) which need to access
external secrets for remote logging setup but should not use ``MetastoreBackend`` (to maintain worker isolation).

**Architecture Benefits:**

- Workers (supervisor + task runner) never use ``MetastoreBackend``, maintaining strict isolation
- External secrets backends (AWS Secrets Manager, Vault, etc.) work in all three contexts
- Supervisor falls back to Execution API client for connections not found in external backends
- API server and scheduler have direct database access for optimal performance

**Impact:**

- Hooks like ``GCSHook``, ``S3Hook`` now work correctly in log handlers and plugins
- No code changes required for existing plugins or hooks
- Workers remain isolated from direct database access (network-level DB blocking fully supported)
- External secrets work everywhere (workers, supervisor, API server)
- Robust handling of unknown contexts with safe minimal chain

See: `#56120 <https://github.com/apache/airflow/issues/56120>`__, `#56583 <https://github.com/apache/airflow/issues/56583>`__, `#51816 <https://github.com/apache/airflow/issues/51816>`__

* Types of change

* [ ] Dag changes
* [ ] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [ ] Code interface changes
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/api_fastapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import os

# Mark this as a server context before any airflow imports
# This ensures plugins loaded at import time get the correct secrets backend chain
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"

from airflow.api_fastapi.app import cached_app

# There is no way to pass the apps to this file from Airflow CLI
Expand Down
5 changes: 5 additions & 0 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,11 @@ def set_ti_span_attrs(cls, span, state, ti):
span.add_event(name="airflow.task.ended", timestamp=datetime_to_nano(ti.end_date))

def _execute(self) -> int | None:
import os

# Mark this as a server context for secrets backend detection
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"

self.log.info("Starting the scheduler")

reset_signals = self.register_signals()
Expand Down
29 changes: 24 additions & 5 deletions airflow-core/src/airflow/secrets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.utils.deprecation_tools import add_deprecated_classes

__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH"]

from airflow.secrets.base_secrets import BaseSecretsBackend

Expand All @@ -38,14 +38,33 @@
"airflow.secrets.metastore.MetastoreBackend",
]

DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
]


__deprecated_classes = {
"cache": {
"SecretCache": "airflow.sdk.execution_time.cache.SecretCache",
},
}
add_deprecated_classes(__deprecated_classes, __name__)


def __getattr__(name):
if name == "DEFAULT_SECRETS_SEARCH_PATH_WORKERS":
import warnings

warnings.warn(
"airflow.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS is moved to the Task SDK. "
"Use airflow.sdk.execution_time.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS instead.",
DeprecationWarning,
stacklevel=2,
)
try:
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS

return DEFAULT_SECRETS_SEARCH_PATH_WORKERS
except (ImportError, AttributeError):
# Back-compat for older Task SDK clients
return [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
]

raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
6 changes: 4 additions & 2 deletions airflow-core/tests/unit/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
write_default_airflow_configuration_if_needed,
)
from airflow.providers_manager import ProvidersManager
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker
Expand Down Expand Up @@ -923,8 +923,10 @@ def test_initialize_secrets_backends_on_workers(self):
backends = initialize_secrets_backends(DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
backend_classes = [backend.__class__.__name__ for backend in backends]

assert len(backends) == 2
assert len(backends) == 3
assert "SystemsManagerParameterStoreBackend" in backend_classes
assert "EnvironmentVariablesBackend" in backend_classes
assert "ExecutionAPISecretsBackend" in backend_classes

@skip_if_force_lowest_dependencies_marker
@conf_vars(
Expand Down
103 changes: 40 additions & 63 deletions task-sdk/src/airflow/sdk/execution_time/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ def _get_connection(conn_id: str) -> Connection:
except SecretCache.NotPresentException:
pass # continue to backends

# iterate over configured backends if not in cache (or expired)
# Iterate over configured backends (which may include SupervisorCommsSecretsBackend
# in worker contexts or MetastoreBackend in API server contexts)
backends = ensure_secrets_backend_loaded()
for secrets_backend in backends:
try:
Expand All @@ -165,26 +166,10 @@ def _get_connection(conn_id: str) -> Connection:
type(secrets_backend).__name__,
)

if backends:
log.debug(
"Connection not found in any of the configured Secrets Backends. Trying to retrieve from API server",
conn_id=conn_id,
)

# TODO: This should probably be moved to a separate module like `airflow.sdk.execution_time.comms`
# or `airflow.sdk.execution_time.connection`
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it
# will make that module depend on Task SDK, which is not ideal because we intend to
# keep Task SDK as a separate package than execution time mods.
# Also applies to _async_get_connection.
from airflow.sdk.execution_time.comms import GetConnection
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS

msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
# If no backend found the connection, raise an error
from airflow.exceptions import AirflowNotFoundException

conn = _process_connection_result_conn(msg)
SecretCache.save_connection_uri(conn_id, conn.get_uri())
return conn
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")


async def _async_get_connection(conn_id: str) -> Connection:
Expand All @@ -201,34 +186,36 @@ async def _async_get_connection(conn_id: str) -> Connection:
_mask_connection_secrets(conn)
return conn
except SecretCache.NotPresentException:
pass # continue to API
pass # continue to backends

from airflow.sdk.execution_time.comms import GetConnection
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS

# Try secrets backends first using async wrapper
# Try secrets backends
backends = ensure_secrets_backend_loaded()
for secrets_backend in backends:
try:
conn = await sync_to_async(secrets_backend.get_connection)(conn_id) # type: ignore[assignment]
# Use async method if available, otherwise wrap sync method
if hasattr(secrets_backend, "aget_connection"):
conn = await secrets_backend.aget_connection(conn_id) # type: ignore[assignment]
else:
conn = await sync_to_async(secrets_backend.get_connection)(conn_id) # type: ignore[assignment]

if conn:
# TODO: this should probably be in get conn
if conn.password:
mask_secret(conn.password)
if conn.extra:
mask_secret(conn.extra)
SecretCache.save_connection_uri(conn_id, conn.get_uri())
_mask_connection_secrets(conn)
return conn
except Exception:
# If one backend fails, try the next one
continue
log.exception(
"Unable to retrieve connection from secrets backend (%s). "
"Checking subsequent secrets backend.",
type(secrets_backend).__name__,
)

# If no backend found the connection, raise an error
from airflow.exceptions import AirflowNotFoundException

# If no secrets backend has the connection, fall back to API server
msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id))
conn = _process_connection_result_conn(msg)
SecretCache.save_connection_uri(conn_id, conn.get_uri())
_mask_connection_secrets(conn)
return conn
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")


def _get_variable(key: str, deserialize_json: bool) -> Any:
Expand All @@ -250,7 +237,8 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
pass # Continue to check backends

backends = ensure_secrets_backend_loaded()
# iterate over backends if not in cache (or expired)

# Iterate over backends if not in cache (or expired)
for secrets_backend in backends:
try:
var_val = secrets_backend.get_variable(key=key)
Expand All @@ -270,31 +258,13 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
type(secrets_backend).__name__,
)

if backends:
log.debug(
"Variable not found in any of the configured Secrets Backends. Trying to retrieve from API server",
key=key,
)

# TODO: This should probably be moved to a separate module like `airflow.sdk.execution_time.comms`
# or `airflow.sdk.execution_time.variable`
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it
# will make that module depend on Task SDK, which is not ideal because we intend to
# keep Task SDK as a separate package than execution time mods.
from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS

msg = SUPERVISOR_COMMS.send(GetVariable(key=key))

if isinstance(msg, ErrorResponse):
raise AirflowRuntimeError(msg)
# If no backend found the variable, raise a not found error (mirrors _get_connection)
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
from airflow.sdk.execution_time.comms import ErrorResponse

if TYPE_CHECKING:
assert isinstance(msg, VariableResult)
variable = _convert_variable_result_to_variable(msg, deserialize_json)
# Save raw value to ensure cache consistency regardless of deserialize_json parameter
SecretCache.save_variable(key, msg.value)
return variable.value
raise AirflowRuntimeError(
ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"})
)


def _set_variable(key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None:
Expand All @@ -307,18 +277,21 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ

from airflow.sdk.execution_time.cache import SecretCache
from airflow.sdk.execution_time.comms import PutVariable
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS

# check for write conflicts on the worker
for secrets_backend in ensure_secrets_backend_loaded():
if isinstance(secrets_backend, ExecutionAPISecretsBackend):
continue
try:
var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
_backend_name = type(secrets_backend).__name__
log.warning(
"The variable %s is defined in the %s secrets backend, which takes "
"precedence over reading from the database. The value in the database will be "
"precedence over reading from the API Server. The value from the API Server will be "
"updated, but to read it you have to delete the conflicting variable "
"from %s",
key,
Expand Down Expand Up @@ -379,12 +352,16 @@ def __eq__(self, other):
return True

def get(self, conn_id: str, default_conn: Any = None) -> Any:
from airflow.exceptions import AirflowNotFoundException

try:
return _get_connection(conn_id)
except AirflowRuntimeError as e:
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
return default_conn
raise
except AirflowNotFoundException:
return default_conn


class VariableAccessor:
Expand Down
29 changes: 29 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/secrets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Secrets backends for task execution contexts."""

from __future__ import annotations

from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend

__all__ = ["ExecutionAPISecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]

DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
]
Loading
Loading