Skip to content

Commit

Permalink
completed D400 for multiple folders
Browse files Browse the repository at this point in the history
  • Loading branch information
bdsoha committed Nov 18, 2022
1 parent f513268 commit f8fa843
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 110 deletions.
5 changes: 3 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
# specific language governing permissions and limitations
# under the License.
"""
Init setup.
Authentication is implemented using flask_login and different environments can
implement their own login mechanisms by providing an `airflow_login` module
in their PYTHONPATH. airflow_login should be based off the
`airflow.www.login`
in their PYTHONPATH. airflow_login should be based off the `airflow.www.login`
isort:skip_file
"""
Expand Down
4 changes: 2 additions & 2 deletions airflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Main executable module"""
"""Main executable module."""
from __future__ import annotations

import os
Expand All @@ -29,7 +29,7 @@


def main():
"""Main executable function"""
"""Main executable function."""
if conf.get("core", "security") == "kerberos":
os.environ["KRB5CCNAME"] = conf.get("kerberos", "ccache")
os.environ["KRB5_KTNAME"] = conf.get("kerberos", "keytab")
Expand Down
4 changes: 2 additions & 2 deletions airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


def configure_logging():
"""Configure & Validate Airflow Logging"""
"""Configure & Validate Airflow Logging."""
logging_class_path = ""
try:
logging_class_path = conf.get("logging", "logging_config_class")
Expand Down Expand Up @@ -79,7 +79,7 @@ def configure_logging():


def validate_logging_config(logging_config):
"""Validate the provided Logging Config"""
"""Validate the provided Logging Config."""
# Now lets validate the other logging-related settings
task_log_reader = conf.get("logging", "task_log_reader")

Expand Down
19 changes: 10 additions & 9 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@
registered_operator_link_classes: dict[str, type] | None = None
registered_ti_dep_classes: dict[str, type] | None = None
timetable_classes: dict[str, type[Timetable]] | None = None
"""Mapping of class names to class of OperatorLinks registered by plugins.
"""
Mapping of class names to class of OperatorLinks registered by plugins.
Used by the DAG serialization code to only allow specific classes to be created
during deserialization
Expand Down Expand Up @@ -184,8 +185,7 @@ def on_load(cls, *args, **kwargs):

def is_valid_plugin(plugin_obj):
"""
Check whether a potential object is a subclass of
the AirflowPlugin class.
Check whether a potential object is a subclass of the AirflowPlugin class.
:param plugin_obj: potential subclass of AirflowPlugin
:return: Whether or not the obj is a valid subclass of
Expand All @@ -205,7 +205,7 @@ def is_valid_plugin(plugin_obj):

def register_plugin(plugin_instance):
"""
Start plugin load and register it after success initialization
Start plugin load and register it after success initialization.
:param plugin_instance: subclass of AirflowPlugin
"""
Expand Down Expand Up @@ -239,7 +239,7 @@ def load_entrypoint_plugins():


def load_plugins_from_plugin_directory():
"""Load and register Airflow Plugins from plugins directory"""
"""Load and register Airflow Plugins from plugins directory."""
global import_errors
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)

Expand Down Expand Up @@ -317,7 +317,7 @@ def ensure_plugins_loaded():


def initialize_web_ui_plugins():
"""Collect extension points for WEB UI"""
"""Collect extension points for WEB UI."""
global plugins
global flask_blueprints
global flask_appbuilder_views
Expand Down Expand Up @@ -357,7 +357,7 @@ def initialize_web_ui_plugins():


def initialize_ti_deps_plugins():
"""Creates modules for loaded extension from custom task instance dependency rule plugins"""
"""Creates modules for loaded extension from custom task instance dependency rule plugins."""
global registered_ti_dep_classes
if registered_ti_dep_classes is not None:
return
Expand All @@ -378,7 +378,7 @@ def initialize_ti_deps_plugins():


def initialize_extra_operators_links_plugins():
"""Creates modules for loaded extension from extra operators links plugins"""
"""Creates modules for loaded extension from extra operators links plugins."""
global global_operator_extra_links
global operator_extra_links
global registered_operator_link_classes
Expand Down Expand Up @@ -492,6 +492,7 @@ def integrate_macros_plugins() -> None:


def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
"""Add listeners from plugins."""
global plugins

ensure_plugins_loaded()
Expand All @@ -507,7 +508,7 @@ def integrate_listener_plugins(listener_manager: ListenerManager) -> None:

def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]:
"""
Dump plugins attributes
Dump plugins attributes.
:param attrs_to_dump: A list of plugin attributes to dump
"""
Expand Down
81 changes: 49 additions & 32 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@

def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str):
"""
If the given field_behaviors dict contains a placeholders node, and there
Verify the correct placeholder prefix.
If the given field_behaviors dict contains a placeholder's node, and there
are placeholders for extra fields (i.e. anything other than the built-in conn
attrs), and if those extra fields are unprefixed, then add the prefix.
Expand Down Expand Up @@ -83,6 +85,8 @@ def ensure_prefix(field):

class LazyDictWithCache(MutableMapping):
"""
Lazy-loaded cached dictionary.
Dictionary, which in case you set callable, executes the passed callable with `key` attribute
at first use - and returns and caches the result.
"""
Expand Down Expand Up @@ -125,7 +129,7 @@ def __contains__(self, key):


def _create_provider_info_schema_validator():
"""Creates JSON schema validator from the provider_info.schema.json"""
"""Creates JSON schema validator from the provider_info.schema.json."""
import jsonschema

with resource_files("airflow").joinpath("provider_info.schema.json").open("rb") as f:
Expand All @@ -136,7 +140,7 @@ def _create_provider_info_schema_validator():


def _create_customized_form_field_behaviours_schema_validator():
"""Creates JSON schema validator from the customized_form_field_behaviours.schema.json"""
"""Creates JSON schema validator from the customized_form_field_behaviours.schema.json."""
import jsonschema

with resource_files("airflow").joinpath("customized_form_field_behaviours.schema.json").open("rb") as f:
Expand All @@ -163,7 +167,7 @@ def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bo
@dataclass
class ProviderInfo:
"""
Provider information
Provider information.
:param version: version string
:param data: dictionary with information about the provider
Expand All @@ -185,14 +189,14 @@ def __post_init__(self):


class HookClassProvider(NamedTuple):
"""Hook class and Provider it comes from"""
"""Hook class and Provider it comes from."""

hook_class_name: str
package_name: str


class HookInfo(NamedTuple):
"""Hook information"""
"""Hook information."""

hook_class_name: str
connection_id_attribute_name: str
Expand All @@ -203,7 +207,7 @@ class HookInfo(NamedTuple):


class ConnectionFormWidgetInfo(NamedTuple):
"""Connection Form Widget information"""
"""Connection Form Widget information."""

hook_class_name: str
package_name: str
Expand All @@ -217,6 +221,7 @@ class ConnectionFormWidgetInfo(NamedTuple):


def log_debug_import_from_sources(class_name, e, provider_package):
"""Log debug imports from sources."""
log.debug(
"Optional feature disabled on exception when importing '%s' from '%s' package",
class_name,
Expand All @@ -226,6 +231,7 @@ def log_debug_import_from_sources(class_name, e, provider_package):


def log_optional_feature_disabled(class_name, e, provider_package):
"""Log optional feature disabled."""
log.debug(
"Optional feature disabled on exception when importing '%s' from '%s' package",
class_name,
Expand All @@ -240,6 +246,7 @@ def log_optional_feature_disabled(class_name, e, provider_package):


def log_import_warning(class_name, e, provider_package):
"""Log import warning."""
log.warning(
"Exception when importing '%s' from '%s' package",
class_name,
Expand Down Expand Up @@ -312,6 +319,8 @@ def _sanity_check(
# So we add our own decorator
def provider_info_cache(cache_name: str) -> Callable[[T], T]:
"""
Decorate and cache provider info.
Decorator factory that create decorator that caches initialization of provider's parameters
:param cache_name: Name of the cache
"""
Expand Down Expand Up @@ -339,7 +348,9 @@ def wrapped_function(*args, **kwargs):

class ProvidersManager(LoggingMixin):
"""
Manages all provider packages. This is a Singleton class. The first time it is
Manages all provider packages.
This is a Singleton class. The first time it is
instantiated, it discovers all available providers in installed packages and
local source folders (if airflow is run from sources).
"""
Expand Down Expand Up @@ -446,11 +457,13 @@ def initialize_providers_auth_backends(self):

def _discover_all_providers_from_packages(self) -> None:
"""
Discovers all providers by scanning packages installed. The list of providers should be returned
via the 'apache_airflow_provider' entrypoint as a dictionary conforming to the
'airflow/provider_info.schema.json' schema. Note that the schema is different at runtime
than provider.yaml.schema.json. The development version of provider schema is more strict and changes
together with the code. The runtime version is more relaxed (allows for additional properties)
Discovers all providers by scanning packages installed.
The list of providers should be returned via the 'apache_airflow_provider'
entrypoint as a dictionary conforming to the 'airflow/provider_info.schema.json'
schema. Note that the schema is different at runtime than provider.yaml.schema.json.
The development version of provider schema is more strict and changes together with
the code. The runtime version is more relaxed (allows for additional properties)
and verifies only the subset of fields that are needed at runtime.
"""
for entry_point, dist in entry_points_with_dist("apache_airflow_provider"):
Expand Down Expand Up @@ -542,9 +555,11 @@ def _discover_hooks_from_connection_types(
provider: ProviderInfo,
):
"""
Discover hooks from the "connection-types" property. This is new, better method that replaces
discovery from hook-class-names as it allows to lazy import individual Hook classes when they
are accessed. The "connection-types" keeps information about both - connection type and class
Discover hooks from the "connection-types" property.
This is new, better method that replaces discovery from hook-class-names as it
allows to lazy import individual Hook classes when they are accessed.
The "connection-types" keeps information about both - connection type and class
name so we can discover all connection-types without importing the classes.
:param hook_class_names_registered: set of registered hook class names for this provider
:param already_registered_warning_connection_types: set of connections for which warning should be
Expand Down Expand Up @@ -595,9 +610,10 @@ def _discover_hooks_from_hook_class_names(
provider_uses_connection_types: bool,
):
"""
Discovers hooks from "hook-class-names' property. This property is deprecated but we should
support it in Airflow 2. The hook-class-names array contained just Hook names without connection
type, therefore we need to import all those classes immediately to know which connection types
Discovers hooks from "hook-class-names' property.
This property is deprecated but we should support it in Airflow 2.
The hook-class-names array contained just Hook names without connection type,
therefore we need to import all those classes immediately to know which connection types
are supported. This makes it impossible to selectively only import those hooks that are used.
:param already_registered_warning_connection_types: list of connection hooks that we should warn
about when finished discovery
Expand Down Expand Up @@ -661,7 +677,7 @@ def _discover_hooks_from_hook_class_names(
)

def _discover_hooks(self) -> None:
"""Retrieves all connections defined in the providers via Hooks"""
"""Retrieves all connections defined in the providers via Hooks."""
for package_name, provider in self._provider_dict.items():
duplicated_connection_types: set[str] = set()
hook_class_names_registered: set[str] = set()
Expand All @@ -679,7 +695,7 @@ def _discover_hooks(self) -> None:

@provider_info_cache("import_all_hooks")
def _import_info_from_all_hooks(self):
"""Force-import all hooks and initialize the connections/fields"""
"""Force-import all hooks and initialize the connections/fields."""
# Retrieve all hooks to make sure that all of them are imported
_ = list(self._hooks_lazy_dict.values())
self._field_behaviours = OrderedDict(sorted(self._field_behaviours.items()))
Expand Down Expand Up @@ -721,7 +737,7 @@ def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_pack

@staticmethod
def _get_attr(obj: Any, attr_name: str):
"""Retrieves attributes of an object, or warns if not found"""
"""Retrieves attributes of an object, or warns if not found."""
if not hasattr(obj, attr_name):
log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name)
return None
Expand All @@ -735,10 +751,11 @@ def _import_hook(
package_name: str | None = None,
) -> HookInfo | None:
"""
Imports hook and retrieves hook information. Either connection_type (for lazy loading)
or hook_class_name must be set - but not both). Only needs package_name if hook_class_name is
passed (for lazy loading, package_name is retrieved from _connection_type_class_provider_dict
together with hook_class_name).
Imports hook and retrieves hook information.
Either connection_type (for lazy loading) or hook_class_name must be set - but not both).
Only needs package_name if hook_class_name is passed (for lazy loading, package_name
is retrieved from _connection_type_class_provider_dict together with hook_class_name).
:param connection_type: type of the connection
:param hook_class_name: name of the hook class
Expand Down Expand Up @@ -883,31 +900,31 @@ def _add_customized_fields(self, package_name: str, hook_class: type, customized
)

def _discover_extra_links(self) -> None:
"""Retrieves all extra links defined in the providers"""
"""Retrieves all extra links defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("extra-links"):
for extra_link_class_name in provider.data["extra-links"]:
if _sanity_check(provider_package, extra_link_class_name, provider):
self._extra_link_class_name_set.add(extra_link_class_name)

def _discover_logging(self) -> None:
"""Retrieves all logging defined in the providers"""
"""Retrieves all logging defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("logging"):
for logging_class_name in provider.data["logging"]:
if _sanity_check(provider_package, logging_class_name, provider):
self._logging_class_name_set.add(logging_class_name)

def _discover_secrets_backends(self) -> None:
"""Retrieves all secrets backends defined in the providers"""
"""Retrieves all secrets backends defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("secrets-backends"):
for secrets_backends_class_name in provider.data["secrets-backends"]:
if _sanity_check(provider_package, secrets_backends_class_name, provider):
self._secrets_backend_class_name_set.add(secrets_backends_class_name)

def _discover_auth_backends(self) -> None:
"""Retrieves all API auth backends defined in the providers"""
"""Retrieves all API auth backends defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("auth-backends"):
for auth_backend_module_name in provider.data["auth-backends"]:
Expand All @@ -923,8 +940,8 @@ def providers(self) -> dict[str, ProviderInfo]:
@property
def hooks(self) -> MutableMapping[str, HookInfo | None]:
"""
Returns dictionary of connection_type-to-hook mapping. Note that the dict can contain
None values if a hook discovered cannot be imported!
Returns dictionary of connection_type-to-hook mapping.
Note that the dict can contain None values if a hook discovered cannot be imported!
"""
self.initialize_providers_hooks()
# When we return hooks here it will only be used to retrieve hook information
Expand Down
Loading

0 comments on commit f8fa843

Please sign in to comment.