diff --git a/airflow-core/docs/core-concepts/debug.rst b/airflow-core/docs/core-concepts/debug.rst index aa57cd4648f52..1e713229f214b 100644 --- a/airflow-core/docs/core-concepts/debug.rst +++ b/airflow-core/docs/core-concepts/debug.rst @@ -69,15 +69,6 @@ is manually ingested. The cleanup step is also skipped, making the intermediate run = dag.test(mark_success_pattern="wait_for_.*|cleanup") print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}") -Comparison with DebugExecutor ------------------------------ - -The ``dag.test`` command has the following benefits over the :class:`~airflow.executors.debug_executor.DebugExecutor` -class, which is now deprecated: - -1. It does not require running an executor at all. Tasks are run one at a time with no executor or scheduler logs. -2. It is faster than running code with a DebugExecutor as it does not need to go through a scheduler loop. - Debugging Airflow dags on the command line ****************************************** @@ -98,25 +89,6 @@ Run ``python -m pdb .py`` for an interactive debugging experie (Pdb) run_this_last -.. _executor:DebugExecutor: - -Debug Executor (deprecated) -*************************** - -The :class:`~airflow.executors.debug_executor.DebugExecutor` is meant as -a debug tool and can be used from IDE. It is a single process executor that -queues :class:`~airflow.models.taskinstance.TaskInstance` and executes them by running -``_run_raw_task`` method. - -Due to its nature the executor can be used with SQLite database. When used -with sensors the executor will change sensor mode to ``reschedule`` to avoid -blocking the execution of DAG. - -Additionally ``DebugExecutor`` can be used in a fail-fast mode that will make -all other running or scheduled tasks fail immediately. To enable this option set -``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your ``airflow.cfg``. -For more information on setting the configuration, see :doc:`../../howto/set-config`. - **IDE setup steps:** 1. Add ``main`` block at the end of your DAG file to make it runnable. diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index b35b179649523..74a4d7eb742b4 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1290,17 +1290,6 @@ secrets: type: integer example: ~ default: "900" -debug: - description: ~ - options: - fail_fast: - description: | - Used only with ``DebugExecutor``. If set to ``True`` DAG will fail with first - failed task. Helpful for debugging purposes. - version_added: 1.10.8 - type: string - example: ~ - default: "False" api: description: ~ options: diff --git a/airflow-core/src/airflow/executors/debug_executor.py b/airflow-core/src/airflow/executors/debug_executor.py deleted file mode 100644 index 9d86f5e72703c..0000000000000 --- a/airflow-core/src/airflow/executors/debug_executor.py +++ /dev/null @@ -1,155 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -DebugExecutor. - -.. seealso:: - For more information on how the DebugExecutor works, take a look at the guide: - :ref:`executor:DebugExecutor` -""" - -from __future__ import annotations - -import threading -import time -from typing import TYPE_CHECKING, Any - -from airflow.executors.base_executor import BaseExecutor -from airflow.utils.state import TaskInstanceState - -if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstance - from airflow.models.taskinstancekey import TaskInstanceKey - - -class DebugExecutor(BaseExecutor): - """ - This executor is meant for debugging purposes. It can be used with SQLite. - - It executes one task instance at time. Additionally to support working - with sensors, all sensors ``mode`` will be automatically set to "reschedule". - """ - - _terminated = threading.Event() - - is_production: bool = False - - change_sensor_mode_to_reschedule: bool = True - - def __init__(self): - super().__init__() - self.tasks_to_run: list[TaskInstance] = [] - # Place where we keep information for task instance raw run - self.tasks_params: dict[TaskInstanceKey, dict[str, Any]] = {} - from airflow.configuration import conf - - self.fail_fast = conf.getboolean("debug", "fail_fast") - - def execute_async(self, *args, **kwargs) -> None: - """Replace the method with a custom trigger_task implementation.""" - - def sync(self) -> None: - task_succeeded = True - while self.tasks_to_run: - ti = self.tasks_to_run.pop(0) - if self.fail_fast and not task_succeeded: - self.log.info("Setting %s to %s", ti.key, TaskInstanceState.UPSTREAM_FAILED) - ti.set_state(TaskInstanceState.UPSTREAM_FAILED) - self.change_state(ti.key, TaskInstanceState.UPSTREAM_FAILED) - elif self._terminated.is_set(): - self.log.info("Executor is terminated! Stopping %s to %s", ti.key, TaskInstanceState.FAILED) - ti.set_state(TaskInstanceState.FAILED) - self.fail(ti.key) - else: - task_succeeded = self._run_task(ti) - - def _run_task(self, ti: TaskInstance) -> bool: - self.log.debug("Executing task: %s", ti) - key = ti.key - try: - params = self.tasks_params.pop(ti.key, {}) - ti.run(**params) - self.success(key) - return True - except Exception as e: - ti.set_state(TaskInstanceState.FAILED) - self.fail(key) - self.log.exception("Failed to execute task: %s.", e) - return False - - def queue_task_instance( - self, - task_instance: TaskInstance, - mark_success: bool = False, - ignore_all_deps: bool = False, - ignore_depends_on_past: bool = False, - wait_for_past_depends_before_skipping: bool = False, - ignore_task_deps: bool = False, - ignore_ti_state: bool = False, - pool: str | None = None, - cfg_path: str | None = None, - ) -> None: - """Queues task instance with empty command because we do not need it.""" - if TYPE_CHECKING: - assert task_instance.task - - self.queue_command( - task_instance, - [str(task_instance)], # Just for better logging, it's not used anywhere - priority=task_instance.priority_weight, - queue=task_instance.task.queue, - ) - # Save params for TaskInstance._run_raw_task - self.tasks_params[task_instance.key] = { - "mark_success": mark_success, - "pool": pool, - } - - def trigger_tasks(self, open_slots: int) -> None: - """ - Triggers tasks. - - Instead of calling exec_async we just add task instance to tasks_to_run queue. - - :param open_slots: Number of open slots - """ - if not self.queued_tasks: - # wait a bit if there are no tasks ready to be executed to avoid spinning too fast in the void - time.sleep(0.5) - return - - sorted_queue = sorted( - self.queued_tasks.items(), - key=lambda x: x[1][1], - reverse=True, - ) - for _ in range(min((open_slots, len(self.queued_tasks)))): - key, (_, _, _, ti) = sorted_queue.pop(0) - self.queued_tasks.pop(key) - self.running.add(key) - self.tasks_to_run.append(ti) # type: ignore - - def end(self) -> None: - """Set states of queued tasks to UPSTREAM_FAILED marking them as not executed.""" - for ti in self.tasks_to_run: - self.log.info("Setting %s to %s", ti.key, TaskInstanceState.UPSTREAM_FAILED) - ti.set_state(TaskInstanceState.UPSTREAM_FAILED) - self.change_state(ti.key, TaskInstanceState.UPSTREAM_FAILED) - - def terminate(self) -> None: - self._terminated.set() diff --git a/airflow-core/src/airflow/executors/executor_constants.py b/airflow-core/src/airflow/executors/executor_constants.py index 5d752e23233d3..852db43eea3fc 100644 --- a/airflow-core/src/airflow/executors/executor_constants.py +++ b/airflow-core/src/airflow/executors/executor_constants.py @@ -31,13 +31,11 @@ class ConnectorSource(Enum): SEQUENTIAL_EXECUTOR = "SequentialExecutor" CELERY_EXECUTOR = "CeleryExecutor" KUBERNETES_EXECUTOR = "KubernetesExecutor" -DEBUG_EXECUTOR = "DebugExecutor" MOCK_EXECUTOR = "MockExecutor" CORE_EXECUTOR_NAMES = { LOCAL_EXECUTOR, SEQUENTIAL_EXECUTOR, CELERY_EXECUTOR, KUBERNETES_EXECUTOR, - DEBUG_EXECUTOR, MOCK_EXECUTOR, } diff --git a/airflow-core/src/airflow/executors/executor_loader.py b/airflow-core/src/airflow/executors/executor_loader.py index 80fed5c727871..6624d14e23fc5 100644 --- a/airflow-core/src/airflow/executors/executor_loader.py +++ b/airflow-core/src/airflow/executors/executor_loader.py @@ -26,7 +26,6 @@ from airflow.executors.executor_constants import ( CELERY_EXECUTOR, CORE_EXECUTOR_NAMES, - DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, SEQUENTIAL_EXECUTOR, @@ -61,7 +60,6 @@ class ExecutorLoader: CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor", KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes." "executors.kubernetes_executor.KubernetesExecutor", - DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor", } @classmethod diff --git a/airflow-core/tests/unit/cli/commands/test_standalone_command.py b/airflow-core/tests/unit/cli/commands/test_standalone_command.py index 6151493ee6efa..e90da7e369be9 100644 --- a/airflow-core/tests/unit/cli/commands/test_standalone_command.py +++ b/airflow-core/tests/unit/cli/commands/test_standalone_command.py @@ -26,7 +26,6 @@ from airflow.executors import executor_loader from airflow.executors.executor_constants import ( CELERY_EXECUTOR, - DEBUG_EXECUTOR, KUBERNETES_EXECUTOR, LOCAL_EXECUTOR, SEQUENTIAL_EXECUTOR, @@ -41,12 +40,10 @@ class TestStandaloneCommand: (SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), - (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), (LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR), (CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), (KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), - (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), ], ) def test_calculate_env(self, conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor): diff --git a/airflow-core/tests/unit/executors/test_debug_executor.py b/airflow-core/tests/unit/executors/test_debug_executor.py deleted file mode 100644 index fdafa68ad5aa6..0000000000000 --- a/airflow-core/tests/unit/executors/test_debug_executor.py +++ /dev/null @@ -1,147 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from unittest import mock -from unittest.mock import MagicMock - -from airflow.executors.debug_executor import DebugExecutor -from airflow.utils.state import State - - -class TestDebugExecutor: - @mock.patch("airflow.executors.debug_executor.DebugExecutor._run_task") - def test_sync(self, run_task_mock): - run_task_mock.return_value = True - - executor = DebugExecutor() - - ti1 = MagicMock(key="t1") - ti2 = MagicMock(key="t2") - executor.tasks_to_run = [ti1, ti2] - - executor.sync() - assert not executor.tasks_to_run - run_task_mock.assert_has_calls([mock.call(ti1), mock.call(ti2)]) - - @mock.patch("airflow.models.taskinstance.TaskInstance") - def test_run_task(self, task_instance_mock): - ti_key = "key" - job_id = " job_id" - task_instance_mock.key = ti_key - task_instance_mock.job_id = job_id - - executor = DebugExecutor() - executor.running = {ti_key} - succeeded = executor._run_task(task_instance_mock) - - assert succeeded - task_instance_mock.run.assert_called() - - def test_queue_task_instance(self): - key = "ti_key" - ti = MagicMock(key=key) - - executor = DebugExecutor() - executor.queue_task_instance(task_instance=ti, mark_success=True, pool="pool") - - assert key in executor.queued_tasks - assert key in executor.tasks_params - assert executor.tasks_params[key] == { - "mark_success": True, - "pool": "pool", - } - - def test_trigger_tasks(self): - execute_mock = MagicMock() - executor = DebugExecutor() - executor.execute_async = execute_mock - - executor.queued_tasks = { - "t1": (None, 1, None, MagicMock(key="t1")), - "t2": (None, 2, None, MagicMock(key="t2")), - } - - executor.trigger_tasks(open_slots=4) - assert not executor.queued_tasks - assert len(executor.running) == 2 - assert len(executor.tasks_to_run) == 2 - assert not execute_mock.called - - def test_end(self): - ti = MagicMock(key="ti_key") - - executor = DebugExecutor() - executor.tasks_to_run = [ti] - executor.running = {ti.key} - executor.end() - - ti.set_state.assert_called_once_with(State.UPSTREAM_FAILED) - assert not executor.running - - @mock.patch("airflow.executors.debug_executor.DebugExecutor.change_state") - def test_fail_fast(self, change_state_mock): - with mock.patch.dict("os.environ", {"AIRFLOW__DEBUG__FAIL_FAST": "True"}): - executor = DebugExecutor() - - ti1 = MagicMock(key="t1") - ti2 = MagicMock(key="t2") - - ti1.run.side_effect = Exception - - executor.tasks_to_run = [ti1, ti2] - - executor.sync() - - assert executor.fail_fast - assert not executor.tasks_to_run - change_state_mock.assert_has_calls( - [ - mock.call(ti1.key, State.FAILED, None), - mock.call(ti2.key, State.UPSTREAM_FAILED), - ] - ) - - def test_reschedule_mode(self): - assert DebugExecutor.change_sensor_mode_to_reschedule - - def test_is_production_default_value(self): - assert not DebugExecutor.is_production - - @mock.patch("time.sleep", autospec=True) - def test_trigger_sleep_when_no_task(self, mock_sleep): - execute_mock = MagicMock() - executor = DebugExecutor() - executor.execute_async = execute_mock - executor.queued_tasks = {} - executor.trigger_tasks(open_slots=5) - mock_sleep.assert_called() - - @mock.patch("airflow.executors.debug_executor.DebugExecutor.change_state") - def test_sync_after_terminate(self, change_state_mock): - executor = DebugExecutor() - - ti1 = MagicMock(key="t1") - executor.tasks_to_run = [ti1] - executor.terminate() - executor.sync() - - change_state_mock.assert_has_calls( - [ - mock.call(ti1.key, State.FAILED, None), - ] - ) diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py b/airflow-core/tests/unit/executors/test_executor_loader.py index 57428440c8d2a..1ef270d11e97b 100644 --- a/airflow-core/tests/unit/executors/test_executor_loader.py +++ b/airflow-core/tests/unit/executors/test_executor_loader.py @@ -45,7 +45,6 @@ def test_no_executor_configured(self): "executor_name", [ "CeleryExecutor", - "DebugExecutor", "KubernetesExecutor", "LocalExecutor", ], @@ -266,8 +265,8 @@ def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_ "executor_config", [ "Celery::Executor, LocalExecutor", - "LocalExecutor, Ce:ler:yExecutor, DebugExecutor", - "LocalExecutor, CeleryExecutor:, DebugExecutor", + "LocalExecutor, Ce:ler:yExecutor", + "LocalExecutor, CeleryExecutor:", "LocalExecutor, my_cool_alias:", "LocalExecutor, my_cool_alias:CeleryExecutor", "LocalExecutor, module.path.first:alias_second", @@ -282,11 +281,10 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, ("executor_config", "expected_value"), [ ("CeleryExecutor", "CeleryExecutor"), - ("DebugExecutor", "DebugExecutor"), ("KubernetesExecutor", "KubernetesExecutor"), ("LocalExecutor", "LocalExecutor"), ("CeleryExecutor, LocalExecutor", "CeleryExecutor"), - ("LocalExecutor, CeleryExecutor, DebugExecutor", "LocalExecutor"), + ("LocalExecutor, CeleryExecutor", "LocalExecutor"), ], ) def test_should_support_import_executor_from_core(self, executor_config, expected_value): diff --git a/contributing-docs/09_testing.rst b/contributing-docs/09_testing.rst index 6538b9e185d92..06d1d6a880aaa 100644 --- a/contributing-docs/09_testing.rst +++ b/contributing-docs/09_testing.rst @@ -53,7 +53,7 @@ You can also run other kinds of tests when you are developing airflow packages: client works correctly. * `DAG testing `__ is a document that describes how to test DAGs in a local environment - with ``DebugExecutor``. Note, that this is a legacy method - you can now use dag.test() method to test DAGs. + with ``dag.test()``. ------ diff --git a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst index d58942b1d1e3e..34991d027155c 100644 --- a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst +++ b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst @@ -109,7 +109,7 @@ Setting up debugging if __name__ == "__main__": dag.test() -- Add ``"AIRFLOW__CORE__EXECUTOR": "DebugExecutor"`` to the ``"env"`` field of Debug configuration. +- Add ``"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"`` to the ``"env"`` field of Debug configuration. - Using the ``Run`` view click on ``Create a launch.json file`` @@ -133,7 +133,7 @@ Setting up debugging "program": "${workspaceFolder}/files/dags/example_bash_operator.py", "env": { "PYTHONUNBUFFERED": "1", - "AIRFLOW__CORE__EXECUTOR": "DebugExecutor" + "AIRFLOW__CORE__EXECUTOR": "LocalExecutor" }, "python": "${env:HOME}/.pyenv/versions/airflow/bin/python" ]