Skip to content

Commit

Permalink
[AIRFLOW-1641] Handle executor events in the scheduler
Browse files Browse the repository at this point in the history
While in Backfills we do handle the executor
state,
we do not in the Scheduler. In case there is an
unspecified
error (e.g. a timeout, airflow command failure)
tasks
can get stuck.

Closes #2715 from bolkedebruin/AIRFLOW-1641

(cherry picked from commit 2abead7)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>
  • Loading branch information
bolkedebruin committed Oct 27, 2017
1 parent 6144c6f commit 7354976
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 42 deletions.
23 changes: 18 additions & 5 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,26 @@ def fail(self, key):
def success(self, key):
self.change_state(key, State.SUCCESS)

def get_event_buffer(self):
def get_event_buffer(self, dag_ids=None):
"""
Returns and flush the event buffer
Returns and flush the event buffer. In case dag_ids is specified
it will only return and flush events for the given dag_ids. Otherwise
it returns and flushes all
:param dag_ids: to dag_ids to return events for, if None returns all
:return: a dict of events
"""
d = self.event_buffer
self.event_buffer = {}
return d
cleared_events = dict()
if dag_ids is None:
cleared_events = self.event_buffer
self.event_buffer = dict()
else:
for key in list(self.event_buffer.keys()):
dag_id, _, _ = key
if dag_id in dag_ids:
cleared_events[key] = self.event_buffer.pop(key)

return cleared_events

def execute_async(self, key, command, queue=None): # pragma: no cover
"""
Expand Down
52 changes: 41 additions & 11 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
TI = models.TaskInstance
# actually enqueue them
for task_instance in task_instances:
simple_dag = simple_dag_bag.get_dag(task_instance.dag_id)
command = " ".join(TI.generate_command(
task_instance.dag_id,
task_instance.task_id,
Expand All @@ -1261,8 +1262,8 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
ignore_task_deps=False,
ignore_ti_state=False,
pool=task_instance.pool,
file_path=simple_dag_bag.get_dag(task_instance.dag_id).full_filepath,
pickle_id=simple_dag_bag.get_dag(task_instance.dag_id).pickle_id))
file_path=simple_dag.full_filepath,
pickle_id=simple_dag.pickle_id))

priority = task_instance.priority_weight
queue = task_instance.queue
Expand Down Expand Up @@ -1372,20 +1373,49 @@ def _process_dags(self, dagbag, dags, tis_out):

models.DagStat.update([d.dag_id for d in dags])

def _process_executor_events(self):
@provide_session
def _process_executor_events(self, simple_dag_bag, session=None):
"""
Respond to executor events.
:param executor: the executor that's running the task instances
:type executor: BaseExecutor
:return: None
"""
for key, executor_state in list(self.executor.get_event_buffer().items()):
# TODO: this shares quite a lot of code with _manage_executor_state

TI = models.TaskInstance
for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)
.items()):
dag_id, task_id, execution_date = key
self.log.info(
"Executor reports %s.%s execution_date=%s as %s",
dag_id, task_id, execution_date, executor_state
dag_id, task_id, execution_date, state
)
if state == State.FAILED or state == State.SUCCESS:
qry = session.query(TI).filter(TI.dag_id == dag_id,
TI.task_id == task_id,
TI.execution_date == execution_date)
ti = qry.first()
if not ti:
self.log.warning("TaskInstance %s went missing from the database", ti)
continue

# TODO: should we fail RUNNING as well, as we do in Backfills?
if ti.state == State.QUEUED:
msg = ("Executor reports task instance %s finished (%s) "
"although the task says its %s. Was the task "
"killed externally?".format(ti, state, ti.state))
self.log.error(msg)
try:
simple_dag = simple_dag_bag.get_dag(dag_id)
dagbag = models.DagBag(simple_dag.full_filepath)
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle failure for %s"
". Setting task to FAILED without callbacks or "
"retries. Do you have enough resources?", ti)
ti.state = State.FAILED
session.merge(ti)
session.commit()

def _log_file_processing_stats(self,
known_file_paths,
Expand Down Expand Up @@ -1586,8 +1616,8 @@ def _execute_helper(self, processor_manager):
processor_manager.wait_until_finished()

# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
if len(simple_dags) > 0:
simple_dag_bag = SimpleDagBag(simple_dags)

# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
Expand Down Expand Up @@ -1615,7 +1645,7 @@ def _execute_helper(self, processor_manager):
self.executor.heartbeat()

# Process events from the executor
self._process_executor_events()
self._process_executor_events(simple_dag_bag)

# Heartbeat the scheduler periodically
time_since_last_heartbeat = (datetime.utcnow() -
Expand Down
42 changes: 16 additions & 26 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,26 @@ class SimpleDag(BaseDag):
required for instantiating and scheduling its associated tasks.
"""

def __init__(self,
dag_id,
task_ids,
full_filepath,
concurrency,
is_paused,
pickle_id):
"""
:param dag_id: ID of the DAG
:type dag_id: unicode
:param task_ids: task IDs associated with the DAG
:type task_ids: list[unicode]
:param full_filepath: path to the file containing the DAG e.g.
/a/b/c.py
:type full_filepath: unicode
:param concurrency: No more than these many tasks from the
dag should run concurrently
:type concurrency: int
:param is_paused: Whether or not this DAG is paused. Tasks from paused
DAGs are not scheduled
:type is_paused: bool
def __init__(self, dag, pickle_id=None):
"""
:param dag: the DAG
:type dag: DAG
:param pickle_id: ID associated with the pickled version of this DAG.
:type pickle_id: unicode
"""
self._dag_id = dag_id
self._task_ids = task_ids
self._full_filepath = full_filepath
self._is_paused = is_paused
self._concurrency = concurrency
self._dag_id = dag.dag_id
self._task_ids = [task.task_id for task in dag.tasks]
self._full_filepath = dag.full_filepath
self._is_paused = dag.is_paused
self._concurrency = dag.concurrency
self._pickle_id = pickle_id
self._task_special_args = {}
for task in dag.tasks:
special_args = {}
if task.task_concurrency is not None:
special_args['task_concurrency'] = task.task_concurrency
if len(special_args) > 0:
self._task_special_args[task.task_id] = special_args

@property
def dag_id(self):
Expand Down
40 changes: 40 additions & 0 deletions tests/executors/test_base_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import State

from datetime import datetime


class BaseExecutorTest(unittest.TestCase):
def test_get_event_buffer(self):
executor = BaseExecutor()

date = datetime.utcnow()

key1 = ("my_dag1", "my_task1", date)
key2 = ("my_dag2", "my_task1", date)
key3 = ("my_dag2", "my_task2", date)
state = State.SUCCESS
executor.event_buffer[key1] = state
executor.event_buffer[key2] = state
executor.event_buffer[key3] = state

self.assertEqual(len(executor.get_event_buffer(("my_dag1",))), 1)
self.assertEqual(len(executor.get_event_buffer()), 2)
self.assertEqual(len(executor.event_buffer), 0)

48 changes: 48 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,54 @@ def run_single_scheduler_loop_with_no_dags(dags_folder):
scheduler.heartrate = 0
scheduler.run()

def _make_simple_dag_bag(self, dags):
return SimpleDagBag([SimpleDag(dag) for dag in dags])

def test_process_executor_events(self):
dag_id = "test_process_executor_events"
dag_id2 = "test_process_executor_events_2"
task_id_1 = 'dummy_task'

dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
dag2 = DAG(dag_id=dag_id2, start_date=DEFAULT_DATE)
task1 = DummyOperator(dag=dag, task_id=task_id_1)
task2 = DummyOperator(dag=dag2, task_id=task_id_1)

dagbag1 = self._make_simple_dag_bag([dag])
dagbag2 = self._make_simple_dag_bag([dag2])

scheduler = SchedulerJob(**self.default_scheduler_args)
session = settings.Session()

ti1 = TI(task1, DEFAULT_DATE)
ti1.state = State.QUEUED
session.merge(ti1)
session.commit()

executor = TestExecutor()
executor.event_buffer[ti1.key] = State.FAILED

scheduler.executor = executor

# dag bag does not contain dag_id
scheduler._process_executor_events(simple_dag_bag=dagbag2)
ti1.refresh_from_db()
self.assertEqual(ti1.state, State.QUEUED)

# dag bag does contain dag_id
scheduler._process_executor_events(simple_dag_bag=dagbag1)
ti1.refresh_from_db()
self.assertEqual(ti1.state, State.FAILED)

ti1.state = State.SUCCESS
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.SUCCESS

scheduler._process_executor_events(simple_dag_bag=dagbag1)
ti1.refresh_from_db()
self.assertEqual(ti1.state, State.SUCCESS)

def test_execute_task_instances_is_paused_wont_execute(self):
dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
task_id_1 = 'dummy_task'
Expand Down

0 comments on commit 7354976

Please sign in to comment.