Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 22 additions & 98 deletions airflow-core/src/airflow/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,102 +16,26 @@
# under the License.
from __future__ import annotations

import inspect
import logging
from collections.abc import Callable, Mapping
from functools import cache
from typing import (
TYPE_CHECKING,
from airflow.utils.deprecation_tools import add_deprecated_classes

add_deprecated_classes(
{
__name__: {
"get_fs": "airflow.sdk.io.get_fs",
"has_fs": "airflow.sdk.io.has_fs",
"attach": "airflow.sdk.io.attach",
"Properties": "airflow.sdk.io.Properties",
"_BUILTIN_SCHEME_TO_FS": "airflow.sdk.io.fs._BUILTIN_SCHEME_TO_FS",
},
"path": {
"ObjectStoragePath": "airflow.sdk.ObjectStoragePath",
},
"storage": {
"attach": "airflow.sdk.io.attach",
},
"typedef": {
"Properties": "airflow.sdk.io.typedef.Properties",
},
},
package=__name__,
)

from fsspec.implementations.local import LocalFileSystem

from airflow.providers_manager import ProvidersManager
from airflow.stats import Stats
from airflow.utils.module_loading import import_string

if TYPE_CHECKING:
from fsspec import AbstractFileSystem

from airflow.io.typedef import Properties


log = logging.getLogger(__name__)


def _file(_: str | None, storage_options: Properties) -> LocalFileSystem:
return LocalFileSystem(**storage_options)


# builtin supported filesystems
_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None, Properties], AbstractFileSystem]] = {
"file": _file,
"local": _file,
}


@cache
def _register_filesystems() -> Mapping[
str,
Callable[[str | None, Properties], AbstractFileSystem] | Callable[[str | None], AbstractFileSystem],
]:
scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
with Stats.timer("airflow.io.load_filesystems") as timer:
manager = ProvidersManager()
for fs_module_name in manager.filesystem_module_names:
fs_module = import_string(fs_module_name)
for scheme in getattr(fs_module, "schemes", []):
if scheme in scheme_to_fs:
log.warning("Overriding scheme %s for %s", scheme, fs_module_name)

method = getattr(fs_module, "get_fs", None)
if method is None:
raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method")
scheme_to_fs[scheme] = method

log.debug("loading filesystems from providers took %.3f seconds", timer.duration)
return scheme_to_fs


def get_fs(
scheme: str, conn_id: str | None = None, storage_options: Properties | None = None
) -> AbstractFileSystem:
"""
Get a filesystem by scheme.

:param scheme: the scheme to get the filesystem for
:return: the filesystem method
:param conn_id: the airflow connection id to use
:param storage_options: the storage options to pass to the filesystem
"""
filesystems = _register_filesystems()
try:
fs = filesystems[scheme]
except KeyError:
raise ValueError(f"No filesystem registered for scheme {scheme}") from None

options = storage_options or {}

# MyPy does not recognize dynamic parameters inspection when we call the method, and we have to do
# it for compatibility reasons with already released providers, that's why we need to ignore
# mypy errors here
parameters = inspect.signature(fs).parameters
if len(parameters) == 1:
if options:
raise AttributeError(
f"Filesystem {scheme} does not support storage options, but options were passed."
f"This most likely means that you are using an old version of the provider that does not "
f"support storage options. Please upgrade the provider if possible."
)
return fs(conn_id) # type: ignore[call-arg]
return fs(conn_id, options) # type: ignore[call-arg]


def has_fs(scheme: str) -> bool:
"""
Check if a filesystem is available for a scheme.

:param scheme: the scheme to check
:return: True if a filesystem is available for the scheme
"""
return scheme in _register_filesystems()
22 changes: 0 additions & 22 deletions airflow-core/src/airflow/io/path.py

This file was deleted.

22 changes: 0 additions & 22 deletions airflow-core/src/airflow/io/storage.py

This file was deleted.

114 changes: 84 additions & 30 deletions airflow-core/src/airflow/utils/deprecation_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,51 +84,105 @@ def add_deprecated_classes(
Add deprecated attribute PEP-563 imports and warnings modules to the package.

Works for classes, functions, variables, and other module attributes.

:param module_imports: imports to use
:param package: package name
:param override_deprecated_classes: override target attributes with deprecated ones. If module +
target attribute is found in the dictionary, it will be displayed in the warning message.
Supports both creating virtual modules and modifying existing modules.

:param module_imports: imports to use. Format: dict[str, dict[str, str]]
- Keys are module names (creates virtual modules)
- Special key __name__ modifies the current module for direct attribute imports
- Can mix both approaches in a single call
:param package: package name (typically __name__)
:param override_deprecated_classes: override target attributes with deprecated ones.
Format: dict[str, dict[str, str]] matching the structure of module_imports
:param extra_message: extra message to display in the warning or import error message

Example:
Examples:
# Create virtual modules (e.g., for removed .py files)
add_deprecated_classes(
{"basenotifier": {"BaseNotifier": "airflow.sdk.bases.notifier.BaseNotifier"}},
package=__name__,
)

This makes 'from airflow.notifications.basenotifier import BaseNotifier' still work,
even if 'basenotifier.py' was removed, and shows a warning with the new path.

Wildcard Example:
# Wildcard support - redirect all attributes to new module
add_deprecated_classes(
{"timezone": {"*": "airflow.sdk.timezone"}},
package=__name__,
)

This makes 'from airflow.utils.timezone import utc' redirect to 'airflow.sdk.timezone.utc',
# Current module direct imports
add_deprecated_classes(
{
__name__: {
"get_fs": "airflow.sdk.io.fs.get_fs",
"has_fs": "airflow.sdk.io.fs.has_fs",
}
},
package=__name__,
)

# Mixed behavior - both current module and submodule attributes
add_deprecated_classes(
{
__name__: {
"get_fs": "airflow.sdk.io.fs.get_fs",
"has_fs": "airflow.sdk.io.fs.has_fs",
"Properties": "airflow.sdk.io.typedef.Properties",
},
"typedef": {
"Properties": "airflow.sdk.io.typedef.Properties",
}
},
package=__name__,
)

The first example makes 'from airflow.notifications.basenotifier import BaseNotifier' work
even if 'basenotifier.py' was removed.

The second example makes 'from airflow.utils.timezone import utc' redirect to 'airflow.sdk.timezone.utc',
allowing any attribute from the deprecated module to be accessed from the new location.

Note that "add_deprecated_classes method should be called in the `__init__.py` file in the package
where the deprecated classes are located - this way the module `.py` files should be removed and what
remains in the package is just the `__init__.py` file.
The third example makes 'from airflow.io import get_fs' work with direct imports from the current module.

See for example `airflow/decorators/__init__.py` file.
The fourth example handles both direct imports from the current module and submodule imports.
"""
# Handle both current module and virtual module deprecations
for module_name, imports in module_imports.items():
full_module_name = f"{package}.{module_name}"
module_type = ModuleType(full_module_name)
if override_deprecated_classes and module_name in override_deprecated_classes:
override_deprecated_classes_for_module = override_deprecated_classes[module_name]
if module_name == package:
# Special case: modify the current module for direct attribute imports
if package not in sys.modules:
raise ValueError(f"Module {package} not found in sys.modules")

module = sys.modules[package]

# Create the __getattr__ function for current module
current_override = {}
if override_deprecated_classes and package in override_deprecated_classes:
current_override = override_deprecated_classes[package]

getattr_func = functools.partial(
getattr_with_deprecation,
imports,
package,
current_override,
extra_message or "",
)

# Set the __getattr__ function on the current module
setattr(module, "__getattr__", getattr_func)
else:
override_deprecated_classes_for_module = {}

# Mypy is not able to derive the right function signature https://github.com/python/mypy/issues/2427
module_type.__getattr__ = functools.partial( # type: ignore[assignment]
getattr_with_deprecation,
imports,
full_module_name,
override_deprecated_classes_for_module,
extra_message or "",
)
sys.modules.setdefault(full_module_name, module_type)
# Create virtual modules for submodule imports
full_module_name = f"{package}.{module_name}"
module_type = ModuleType(full_module_name)
if override_deprecated_classes and module_name in override_deprecated_classes:
override_deprecated_classes_for_module = override_deprecated_classes[module_name]
else:
override_deprecated_classes_for_module = {}

# Mypy is not able to derive the right function signature https://github.com/python/mypy/issues/2427
module_type.__getattr__ = functools.partial( # type: ignore[assignment]
getattr_with_deprecation,
imports,
full_module_name,
override_deprecated_classes_for_module,
extra_message or "",
)
sys.modules.setdefault(full_module_name, module_type)
Loading