Skip to content

Commit

Permalink
More executor cleanup to remove Plugin support (#43598)
Browse files Browse the repository at this point in the history
  • Loading branch information
o-nikolas authored Nov 1, 2024
1 parent 1f7a58a commit c6d6ca2
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 19 deletions.
1 change: 0 additions & 1 deletion airflow/executors/executor_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ConnectorSource(Enum):
"""Enum of supported executor import sources."""

CORE = "core"
PLUGIN = "plugin"
CUSTOM_PATH = "custom path"


Expand Down
11 changes: 5 additions & 6 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _get_executor_names(cls) -> list[ExecutorName]:
# paths won't be provided by the user in that case.
if core_executor_module := cls.executors.get(name):
executor_names.append(ExecutorName(alias=name, module_path=core_executor_module))
# Only a module path or plugin name was provided
# A module path was provided
else:
executor_names.append(ExecutorName(alias=None, module_path=name))
# An alias was provided with the module path
Expand All @@ -104,20 +104,20 @@ def _get_executor_names(cls) -> list[ExecutorName]:
# (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
# complicated. Multiple Executors of the same type will be supported by a future multitenancy
# AIP.
# The module component should always be a module or plugin path.
# The module component should always be a module path.
module_path = split_name[1]
if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path:
raise AirflowConfigException(
"Incorrectly formatted executor configuration. Second portion of an executor "
f"configuration must be a module path or plugin but received: {module_path}"
f"configuration must be a module path but received: {module_path}"
)
else:
executor_names.append(ExecutorName(alias=split_name[0], module_path=split_name[1]))
else:
raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}")

# As of now, we do not allow duplicate executors.
# Add all module paths/plugin names to a set, since the actual code is what is unique
# Add all module paths to a set, since the actual code is what is unique
unique_modules = set([exec_name.module_path for exec_name in executor_names])
if len(unique_modules) < len(executor_names):
msg = (
Expand Down Expand Up @@ -216,7 +216,6 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor
This supports the following formats:
* by executor name for core executor
* by ``{plugin_name}.{class_name}`` for executor from plugins
* by import path
* by class name of the Executor
* by ExecutorName object specification
Expand Down Expand Up @@ -271,7 +270,7 @@ def import_executor_cls(
Supports the same formats as ExecutorLoader.load_executor.
:param executor_name: Name of core executor or module path to provider provided as a plugin.
:param executor_name: Name of core executor or module path to executor.
:param validate: Whether or not to validate the executor before returning
:return: executor class via executor_name and executor import source
Expand Down
11 changes: 1 addition & 10 deletions airflow/executors/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,8 @@ def __init__(self, module_path, alias=None):
def set_connector_source(self):
if self.alias in CORE_EXECUTOR_NAMES:
self.connector_source = ConnectorSource.CORE
# If there is only one dot, then this is likely a plugin. This is the best we can do
# to determine.
elif self.module_path.count(".") == 1:
self.log.debug(
"The executor name looks like the plugin path (executor_name=%s) due to having "
"just two period delimited parts. Treating executor as a plugin",
self.module_path,
)
self.connector_source = ConnectorSource.PLUGIN
# Executor must be a module
else:
# Executor must be a module
self.connector_source = ConnectorSource.CUSTOM_PATH

def __repr__(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/executors/test_executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_should_support_custom_path(self):
),
],
),
# Core executors and custom module path executor and plugin
# Core executors and custom module path executor
(
"CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor",
[
Expand All @@ -120,7 +120,7 @@ def test_should_support_custom_path(self):
),
],
),
# Core executors and custom module path executor and plugin with aliases
# Core executors and custom module path executor with aliases
(
(
"CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor"
Expand Down

0 comments on commit c6d6ca2

Please sign in to comment.