diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 8c3330f0332b2..f58ea7bfea981 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -208,6 +208,16 @@ type: string example: ~ default: "{AIRFLOW_HOME}/plugins" + - name: execute_tasks_new_python_interpreter + description: | + Should tasks be executed via forking of the parent process ("False", + the speedier option) or by spawning a new python process ("True" slow, + but means plugin changes picked up by tasks straight away) + default: "False" + example: ~ + version_added: "2.0.0" + see_also: ":ref:`plugins:loading`" + type: boolean - name: fernet_key description: | Secret key to save connection passwords in the db diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index f5adb4306b061..2665749ae1428 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -133,6 +133,11 @@ load_default_connections = True # Where your Airflow plugins are stored plugins_folder = {AIRFLOW_HOME}/plugins +# Should tasks be executed via forking of the parent process ("False", +# the speedier option) or by spawning a new python process ("True" slow, +# but means plugin changes picked up by tasks straight away) +execute_tasks_new_python_interpreter = False + # Secret key to save connection passwords in the db fernet_key = {FERNET_KEY} diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index d9061f78a6ffe..c3c8358dbda3d 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -22,12 +22,17 @@ For more information on how the LocalExecutor works, take a look at the guide: :ref:`executor:LocalExecutor` """ +import os import subprocess +from abc import abstractmethod from multiprocessing import Manager, Process from multiprocessing.managers import SyncManager from queue import Empty, Queue # pylint: disable=unused-import # noqa: F401 from typing import Any, List, Optional, Tuple, Union # pylint: disable=unused-import # noqa: F401 +from setproctitle import setproctitle # pylint: disable=no-name-in-module + +from airflow import settings from airflow.exceptions import AirflowException from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType from airflow.models.taskinstance import ( # pylint: disable=unused-import # noqa: F401 @@ -51,10 +56,16 @@ class LocalWorkerBase(Process, LoggingMixin): """ def __init__(self, result_queue: 'Queue[TaskInstanceStateType]'): - super().__init__() + super().__init__(target=self.do_work) self.daemon: bool = True self.result_queue: 'Queue[TaskInstanceStateType]' = result_queue + def run(self): + # We know we've just started a new process, so lets disconnect from the metadata db now + settings.engine.pool.dispose() + settings.engine.dispose() + return super().run() + def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None: """ Executes command received and stores result state in queue. @@ -64,14 +75,61 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None: """ if key is None: return + self.log.info("%s running %s", self.__class__.__name__, command) + if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER: + state = self._execute_work_in_subprocess(command) + else: + state = self._execute_work_in_fork(command) + + self.result_queue.put((key, state)) + + def _execute_work_in_subprocess(self, command: CommandType) -> str: try: subprocess.check_call(command, close_fds=True) - state = State.SUCCESS + return State.SUCCESS except subprocess.CalledProcessError as e: - state = State.FAILED self.log.error("Failed to execute task %s.", str(e)) - self.result_queue.put((key, state)) + return State.FAILED + + def _execute_work_in_fork(self, command: CommandType) -> str: + pid = os.fork() + if pid: + # In parent, wait for the child + pid, ret = os.waitpid(pid, 0) + return State.SUCCESS if ret == 0 else State.FAILED + + from airflow.sentry import Sentry + ret = 1 + try: + import signal + + from airflow.cli.cli_parser import get_parser + + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + parser = get_parser() + # [1:] - remove "airflow" from the start of the command + args = parser.parse_args(command[1:]) + + setproctitle(f"airflow task supervisor: {command}") + + args.func(args) + ret = 0 + return State.SUCCESS + except Exception as e: # pylint: disable=broad-except + self.log.error("Failed to execute task %s.", str(e)) + finally: + Sentry.flush() + os._exit(ret) # pylint: disable=protected-access + + @abstractmethod + def do_work(self): + """ + Called in the subprocess and should then execute tasks + """ + raise NotImplementedError() class LocalWorker(LocalWorkerBase): @@ -91,7 +149,7 @@ def __init__(self, self.key: TaskInstanceKey = key self.command: CommandType = command - def run(self) -> None: + def do_work(self) -> None: self.execute_work(key=self.key, command=self.command) @@ -111,7 +169,7 @@ def __init__(self, super().__init__(result_queue=result_queue) self.task_queue = task_queue - def run(self) -> None: + def do_work(self) -> None: while True: key, command = self.task_queue.get() try: diff --git a/airflow/settings.py b/airflow/settings.py index 1e5ca06aceddf..0fe1b93fa9c1d 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -365,3 +365,11 @@ def initialize(): # to get all the logs from the print & log statements in the DAG files before a task is run # The handlers are restored after the task completes execution. DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False) + +CAN_FORK = hasattr(os, "fork") + +EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean( + 'core', + 'execute_tasks_new_python_interpreter', + fallback=False, +) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 32a0c845d2a09..daeb2675cbf7a 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -21,11 +21,10 @@ import psutil from setproctitle import setproctitle # pylint: disable=no-name-in-module +from airflow.settings import CAN_FORK from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.process_utils import reap_process_group -CAN_FORK = hasattr(os, "fork") - class StandardTaskRunner(BaseTaskRunner): """ diff --git a/docs/configurations-ref.rst b/docs/configurations-ref.rst index e5b2cda18c244..1fba461df6331 100644 --- a/docs/configurations-ref.rst +++ b/docs/configurations-ref.rst @@ -53,6 +53,10 @@ can set in ``airflow.cfg`` file or using environment variables. {{ option["description"] }} {% endif %} + {% if option.get("see_also") %} + .. seealso:: {{ option["see_also"] }} + {% endif %} + :Type: {{ option["type"] }} :Default: ``{{ "''" if option["default"] == "" else option["default"] }}`` :Environment Variable: ``AIRFLOW__{{ section["name"] | upper }}__{{ option["name"] | upper }}`` diff --git a/docs/modules_management.rst b/docs/modules_management.rst index 3130e516913d8..3632c74d72e21 100644 --- a/docs/modules_management.rst +++ b/docs/modules_management.rst @@ -228,6 +228,8 @@ Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/pyt Below is the sample output of the ``airflow info`` command: +.. seealso:: :ref:`plugins:loading` + .. code-block:: none Apache Airflow [1.10.11] diff --git a/docs/plugins.rst b/docs/plugins.rst index cf278045e447f..bc3350141bd9c 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -66,6 +66,25 @@ Airflow has many components that can be reused when building an application: * Airflow is deployed, you can just piggy back on its deployment logistics * Basic charting capabilities, underlying libraries and abstractions +.. _plugins:loading: + +When are plugins (re)loaded? +---------------------------- + +Plugins are loaded once at the start of every Airflow process, and never reloaded. + +This means that if you make any changes to plugins and you want the webserver or scheduler to use that new +code you will need to restart those processes. + +By default, task execution will use forking to avoid the slow down of having to create a whole new python +interpreter and re-parse all of the Airflow code and start up routines -- this is a big benefit for shorter +running tasks. This does mean that if you use plugins in your tasks, and want them to update you will either +need to restart the worker (if using CeleryExecutor) or scheduler (Local or Sequential executors). The other +option is you can accept the speed hit at start up set the ``core.execute_tasks_new_python_interpreter`` +config setting to True, resulting in launching a whole new python interpreter for tasks. + +(Modules only imported by DAG files on the other hand do not suffer this problem, as DAG files are not +loaded/parsed in any long-running Airflow process.) Interface --------- diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py index 7fc909cdf1c19..20722bf478fd4 100644 --- a/tests/executors/test_local_executor.py +++ b/tests/executors/test_local_executor.py @@ -20,6 +20,8 @@ import unittest from unittest import mock +from airflow import settings +from airflow.exceptions import AirflowException from airflow.executors.local_executor import LocalExecutor from airflow.utils.state import State @@ -29,9 +31,9 @@ class TestLocalExecutor(unittest.TestCase): TEST_SUCCESS_COMMANDS = 5 @mock.patch('airflow.executors.local_executor.subprocess.check_call') - def execution_parallelism(self, mock_check_call, parallelism=0): - success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter'] - fail_command = ['airflow', 'tasks', 'run', 'false'] + def execution_parallelism_subprocess(self, mock_check_call, parallelism=0): + success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter', '2020-10-07'] + fail_command = ['airflow', 'tasks', 'run', 'false', 'task_id', '2020-10-07'] def fake_execute_command(command, close_fds=True): # pylint: disable=unused-argument if command != success_command: @@ -41,6 +43,23 @@ def fake_execute_command(command, close_fds=True): # pylint: disable=unused-arg mock_check_call.side_effect = fake_execute_command + self._test_execute(parallelism, success_command, fail_command) + + @mock.patch('airflow.cli.commands.task_command.task_run') + def execution_parallelism_fork(self, mock_run, parallelism=0): + success_command = ['airflow', 'tasks', 'run', 'success', 'some_parameter', '2020-10-07'] + fail_command = ['airflow', 'tasks', 'run', 'failure', 'some_parameter', '2020-10-07'] + + def fake_task_run(args): + if args.dag_id != 'success': + raise AirflowException('Simulate failed task') + + mock_run.side_effect = fake_task_run + + self._test_execute(parallelism, success_command, fail_command) + + def _test_execute(self, parallelism, success_command, fail_command): + executor = LocalExecutor(parallelism=parallelism) executor.start() @@ -71,12 +90,25 @@ def fake_execute_command(command, close_fds=True): # pylint: disable=unused-arg expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism self.assertEqual(executor.workers_used, expected) - def test_execution_unlimited_parallelism(self): - self.execution_parallelism(parallelism=0) # pylint: disable=no-value-for-parameter - - def test_execution_limited_parallelism(self): - test_parallelism = 2 - self.execution_parallelism(parallelism=test_parallelism) # pylint: disable=no-value-for-parameter + def test_execution_subprocess_unlimited_parallelism(self): + with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', + new_callable=mock.PropertyMock) as option: + option.return_value = True + self.execution_parallelism_subprocess(parallelism=0) # pylint: disable=no-value-for-parameter + + def test_execution_subprocess_limited_parallelism(self): + with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', + new_callable=mock.PropertyMock) as option: + option.return_value = True + self.execution_parallelism_subprocess(parallelism=2) # pylint: disable=no-value-for-parameter + + @mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False) + def test_execution_unlimited_parallelism_fork(self): + self.execution_parallelism_fork(parallelism=0) # pylint: disable=no-value-for-parameter + + @mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False) + def test_execution_limited_parallelism_fork(self): + self.execution_parallelism_fork(parallelism=2) # pylint: disable=no-value-for-parameter @mock.patch('airflow.executors.local_executor.LocalExecutor.sync') @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')