Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@
type: string
example: ~
default: "{AIRFLOW_HOME}/plugins"
- name: execute_tasks_new_python_interpreter
description: |
Should tasks be executed via forking of the parent process ("False",
the speedier option) or by spawning a new python process ("True" slow,
but means plugin changes picked up by tasks straight away)
default: "False"
example: ~
version_added: "2.0.0"
see_also: ":ref:`plugins:loading`"
type: boolean
- name: fernet_key
description: |
Secret key to save connection passwords in the db
Expand Down
5 changes: 5 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ load_default_connections = True
# Where your Airflow plugins are stored
plugins_folder = {AIRFLOW_HOME}/plugins

# Should tasks be executed via forking of the parent process ("False",
# the speedier option) or by spawning a new python process ("True" slow,
# but means plugin changes picked up by tasks straight away)
execute_tasks_new_python_interpreter = False

# Secret key to save connection passwords in the db
fernet_key = {FERNET_KEY}

Expand Down
70 changes: 64 additions & 6 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
For more information on how the LocalExecutor works, take a look at the guide:
:ref:`executor:LocalExecutor`
"""
import os
import subprocess
from abc import abstractmethod
from multiprocessing import Manager, Process
from multiprocessing.managers import SyncManager
from queue import Empty, Queue # pylint: disable=unused-import # noqa: F401
from typing import Any, List, Optional, Tuple, Union # pylint: disable=unused-import # noqa: F401

from setproctitle import setproctitle # pylint: disable=no-name-in-module

from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType
from airflow.models.taskinstance import ( # pylint: disable=unused-import # noqa: F401
Expand All @@ -51,10 +56,16 @@ class LocalWorkerBase(Process, LoggingMixin):
"""

def __init__(self, result_queue: 'Queue[TaskInstanceStateType]'):
super().__init__()
super().__init__(target=self.do_work)
self.daemon: bool = True
self.result_queue: 'Queue[TaskInstanceStateType]' = result_queue

def run(self):
# We know we've just started a new process, so lets disconnect from the metadata db now
settings.engine.pool.dispose()
settings.engine.dispose()
return super().run()

def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
"""
Executes command received and stores result state in queue.
Expand All @@ -64,14 +75,61 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
"""
if key is None:
return

self.log.info("%s running %s", self.__class__.__name__, command)
if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
state = self._execute_work_in_subprocess(command)
else:
state = self._execute_work_in_fork(command)

self.result_queue.put((key, state))

def _execute_work_in_subprocess(self, command: CommandType) -> str:
try:
subprocess.check_call(command, close_fds=True)
state = State.SUCCESS
return State.SUCCESS
except subprocess.CalledProcessError as e:
state = State.FAILED
self.log.error("Failed to execute task %s.", str(e))
self.result_queue.put((key, state))
return State.FAILED

def _execute_work_in_fork(self, command: CommandType) -> str:
pid = os.fork()
if pid:
# In parent, wait for the child
pid, ret = os.waitpid(pid, 0)
return State.SUCCESS if ret == 0 else State.FAILED

from airflow.sentry import Sentry
ret = 1
try:
import signal

from airflow.cli.cli_parser import get_parser

signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)

parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command[1:])

setproctitle(f"airflow task supervisor: {command}")

args.func(args)
ret = 0
return State.SUCCESS
except Exception as e: # pylint: disable=broad-except
self.log.error("Failed to execute task %s.", str(e))
finally:
Sentry.flush()
os._exit(ret) # pylint: disable=protected-access

@abstractmethod
def do_work(self):
"""
Called in the subprocess and should then execute tasks
"""
raise NotImplementedError()


class LocalWorker(LocalWorkerBase):
Expand All @@ -91,7 +149,7 @@ def __init__(self,
self.key: TaskInstanceKey = key
self.command: CommandType = command

def run(self) -> None:
def do_work(self) -> None:
self.execute_work(key=self.key, command=self.command)


Expand All @@ -111,7 +169,7 @@ def __init__(self,
super().__init__(result_queue=result_queue)
self.task_queue = task_queue

def run(self) -> None:
def do_work(self) -> None:
while True:
key, command = self.task_queue.get()
try:
Expand Down
8 changes: 8 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,11 @@ def initialize():
# to get all the logs from the print & log statements in the DAG files before a task is run
# The handlers are restored after the task completes execution.
DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)

CAN_FORK = hasattr(os, "fork")

EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
'core',
'execute_tasks_new_python_interpreter',
fallback=False,
)
3 changes: 1 addition & 2 deletions airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import psutil
from setproctitle import setproctitle # pylint: disable=no-name-in-module

from airflow.settings import CAN_FORK
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.process_utils import reap_process_group

CAN_FORK = hasattr(os, "fork")


class StandardTaskRunner(BaseTaskRunner):
"""
Expand Down
4 changes: 4 additions & 0 deletions docs/configurations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ can set in ``airflow.cfg`` file or using environment variables.
{{ option["description"] }}
{% endif %}

{% if option.get("see_also") %}
.. seealso:: {{ option["see_also"] }}
{% endif %}

:Type: {{ option["type"] }}
:Default: ``{{ "''" if option["default"] == "" else option["default"] }}``
:Environment Variable: ``AIRFLOW__{{ section["name"] | upper }}__{{ option["name"] | upper }}``
Expand Down
2 changes: 2 additions & 0 deletions docs/modules_management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/pyt

Below is the sample output of the ``airflow info`` command:

.. seealso:: :ref:`plugins:loading`

.. code-block:: none

Apache Airflow [1.10.11]
Expand Down
19 changes: 19 additions & 0 deletions docs/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ Airflow has many components that can be reused when building an application:
* Airflow is deployed, you can just piggy back on its deployment logistics
* Basic charting capabilities, underlying libraries and abstractions

.. _plugins:loading:

When are plugins (re)loaded?
----------------------------

Plugins are loaded once at the start of every Airflow process, and never reloaded.

This means that if you make any changes to plugins and you want the webserver or scheduler to use that new
code you will need to restart those processes.

By default, task execution will use forking to avoid the slow down of having to create a whole new python
interpreter and re-parse all of the Airflow code and start up routines -- this is a big benefit for shorter
running tasks. This does mean that if you use plugins in your tasks, and want them to update you will either
need to restart the worker (if using CeleryExecutor) or scheduler (Local or Sequential executors). The other
option is you can accept the speed hit at start up set the ``core.execute_tasks_new_python_interpreter``
config setting to True, resulting in launching a whole new python interpreter for tasks.

(Modules only imported by DAG files on the other hand do not suffer this problem, as DAG files are not
loaded/parsed in any long-running Airflow process.)

Interface
---------
Expand Down
50 changes: 41 additions & 9 deletions tests/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import unittest
from unittest import mock

from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.local_executor import LocalExecutor
from airflow.utils.state import State

Expand All @@ -29,9 +31,9 @@ class TestLocalExecutor(unittest.TestCase):
TEST_SUCCESS_COMMANDS = 5

@mock.patch('airflow.executors.local_executor.subprocess.check_call')
def execution_parallelism(self, mock_check_call, parallelism=0):
success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter']
fail_command = ['airflow', 'tasks', 'run', 'false']
def execution_parallelism_subprocess(self, mock_check_call, parallelism=0):
success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter', '2020-10-07']
fail_command = ['airflow', 'tasks', 'run', 'false', 'task_id', '2020-10-07']

def fake_execute_command(command, close_fds=True): # pylint: disable=unused-argument
if command != success_command:
Expand All @@ -41,6 +43,23 @@ def fake_execute_command(command, close_fds=True): # pylint: disable=unused-arg

mock_check_call.side_effect = fake_execute_command

self._test_execute(parallelism, success_command, fail_command)

@mock.patch('airflow.cli.commands.task_command.task_run')
def execution_parallelism_fork(self, mock_run, parallelism=0):
success_command = ['airflow', 'tasks', 'run', 'success', 'some_parameter', '2020-10-07']
fail_command = ['airflow', 'tasks', 'run', 'failure', 'some_parameter', '2020-10-07']

def fake_task_run(args):
if args.dag_id != 'success':
raise AirflowException('Simulate failed task')

mock_run.side_effect = fake_task_run

self._test_execute(parallelism, success_command, fail_command)

def _test_execute(self, parallelism, success_command, fail_command):

executor = LocalExecutor(parallelism=parallelism)
executor.start()

Expand Down Expand Up @@ -71,12 +90,25 @@ def fake_execute_command(command, close_fds=True): # pylint: disable=unused-arg
expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism
self.assertEqual(executor.workers_used, expected)

def test_execution_unlimited_parallelism(self):
self.execution_parallelism(parallelism=0) # pylint: disable=no-value-for-parameter

def test_execution_limited_parallelism(self):
test_parallelism = 2
self.execution_parallelism(parallelism=test_parallelism) # pylint: disable=no-value-for-parameter
def test_execution_subprocess_unlimited_parallelism(self):
with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER',
new_callable=mock.PropertyMock) as option:
option.return_value = True
self.execution_parallelism_subprocess(parallelism=0) # pylint: disable=no-value-for-parameter

def test_execution_subprocess_limited_parallelism(self):
with mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER',
new_callable=mock.PropertyMock) as option:
option.return_value = True
self.execution_parallelism_subprocess(parallelism=2) # pylint: disable=no-value-for-parameter

@mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False)
def test_execution_unlimited_parallelism_fork(self):
self.execution_parallelism_fork(parallelism=0) # pylint: disable=no-value-for-parameter

@mock.patch.object(settings, 'EXECUTE_TASKS_NEW_PYTHON_INTERPRETER', False)
def test_execution_limited_parallelism_fork(self):
self.execution_parallelism_fork(parallelism=2) # pylint: disable=no-value-for-parameter

@mock.patch('airflow.executors.local_executor.LocalExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
Expand Down