From 43bdd7a4c876f9ac2d4c357d9c30c0956a1b0d76 Mon Sep 17 00:00:00 2001 From: jlowin Date: Wed, 13 Apr 2016 19:18:39 -0400 Subject: [PATCH] Handle queued tasks from multiple jobs/executors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Scheduler is run with `—num-runs`, there can be multiple Schedulers and Executors all trying to run tasks. For queued tasks, Scheduler was previously only trying to run tasks that it itself had queued — but that doesn’t work if the Scheduler is restarting. This PR reverts that behavior and adds two types of “best effort” executions — before running a TI, executors check if it is already running, and before ending executors call sync() one last time --- airflow/executors/base_executor.py | 33 ++++++++++++++------ airflow/executors/celery_executor.py | 1 + airflow/executors/local_executor.py | 2 +- airflow/jobs.py | 46 ++++++---------------------- tests/dags/test_issue_1225.py | 16 +++++++++- tests/jobs.py | 20 +++++++++--- 6 files changed, 66 insertions(+), 52 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 03075834eabb0..2e88fa9425305 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -30,10 +30,11 @@ def start(self): # pragma: no cover """ pass - def queue_command(self, key, command, priority=1, queue=None): + def queue_command(self, task_instance, command, priority=1, queue=None): + key = task_instance.key if key not in self.queued_tasks and key not in self.running: self.logger.info("Adding to queue: {}".format(command)) - self.queued_tasks[key] = (command, priority, queue) + self.queued_tasks[key] = (command, priority, queue, task_instance) def queue_task_instance( self, @@ -54,7 +55,7 @@ def queue_task_instance( pool=pool, pickle_id=pickle_id) self.queue_command( - task_instance.key, + task_instance, command, priority=task_instance.task.priority_weight_total, queue=task_instance.task.queue) @@ -67,9 +68,6 @@ def sync(self): pass def heartbeat(self): - # Calling child class sync method - self.logger.debug("Calling the {} sync method".format(self.__class__)) - self.sync() # Triggering new jobs if not self.parallelism: @@ -86,10 +84,27 @@ def heartbeat(self): key=lambda x: x[1][1], reverse=True) for i in range(min((open_slots, len(self.queued_tasks)))): - key, (command, priority, queue) = sorted_queue.pop(0) - self.running[key] = command + key, (command, _, queue, ti) = sorted_queue.pop(0) + # TODO(jlowin) without a way to know what Job ran which tasks, + # there is a danger that another Job started running a task + # that was also queued to this executor. This is the last chance + # to check if that hapened. The most probable way is that a + # Scheduler tried to run a task that was originally queued by a + # Backfill. This fix reduces the probability of a collision but + # does NOT eliminate it. self.queued_tasks.pop(key) - self.execute_async(key, command=command, queue=queue) + ti.refresh_from_db() + if ti.state != State.RUNNING: + self.running[key] = command + self.execute_async(key, command=command, queue=queue) + else: + self.logger.debug( + 'Task is already running, not sending to ' + 'executor: {}'.format(key)) + + # Calling child class sync method + self.logger.debug("Calling the {} sync method".format(self.__class__)) + self.sync() def change_state(self, key, state): self.running.pop(key) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 95f3daa0e6fd9..de56baf11ad11 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -95,3 +95,4 @@ def end(self, synchronous=False): async.state not in celery_states.READY_STATES for async in self.tasks.values()]): time.sleep(5) + self.sync() diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index f13ee6d1358d3..24ef6c69cac96 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -73,4 +73,4 @@ def end(self): [self.queue.put((None, None)) for w in self.workers] # Wait for commands to finish self.queue.join() - + self.sync() diff --git a/airflow/jobs.py b/airflow/jobs.py index 0b006448c90c6..8ea66731cf7e9 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -238,7 +238,6 @@ def __init__( self.refresh_dags_every = refresh_dags_every self.do_pickle = do_pickle - self.queued_tis = set() super(SchedulerJob, self).__init__(*args, **kwargs) self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') @@ -567,47 +566,22 @@ def process_dag(self, dag, queue): session.close() - def process_events(self, executor, dagbag): - """ - Respond to executor events. - - Used to identify queued tasks and schedule them for further processing. - """ - for key, executor_state in list(executor.get_event_buffer().items()): - dag_id, task_id, execution_date = key - if dag_id not in dagbag.dags: - self.logger.error( - 'Executor reported a dag_id that was not found in the ' - 'DagBag: {}'.format(dag_id)) - continue - elif not dagbag.dags[dag_id].has_task(task_id): - self.logger.error( - 'Executor reported a task_id that was not found in the ' - 'dag: {} in dag {}'.format(task_id, dag_id)) - continue - task = dagbag.dags[dag_id].get_task(task_id) - ti = models.TaskInstance(task, execution_date) - ti.refresh_from_db() - - if executor_state == State.SUCCESS: - # collect queued tasks for prioritiztion - if ti.state == State.QUEUED: - self.queued_tis.add(ti) - else: - # special instructions for failed executions could go here - pass - @provide_session def prioritize_queued(self, session, executor, dagbag): # Prioritizing queued task instances pools = {p.pool: p for p in session.query(models.Pool).all()} - + TI = models.TaskInstance + queued_tis = ( + session.query(TI) + .filter(TI.state == State.QUEUED) + .all() + ) self.logger.info( - "Prioritizing {} queued jobs".format(len(self.queued_tis))) + "Prioritizing {} queued jobs".format(len(queued_tis))) session.expunge_all() d = defaultdict(list) - for ti in self.queued_tis: + for ti in queued_tis: if ti.dag_id not in dagbag.dags: self.logger.info( "DAG no longer in dagbag, deleting {}".format(ti)) @@ -621,8 +595,6 @@ def prioritize_queued(self, session, executor, dagbag): else: d[ti.pool].append(ti) - self.queued_tis.clear() - dag_blacklist = set(dagbag.paused_dags()) for pool, tis in list(d.items()): if not pool: @@ -676,6 +648,7 @@ def prioritize_queued(self, session, executor, dagbag): open_slots -= 1 else: session.delete(ti) + session.commit() continue ti.task = task @@ -721,7 +694,6 @@ def _execute(self): try: loop_start_dttm = datetime.now() try: - self.process_events(executor=executor, dagbag=dagbag) self.prioritize_queued(executor=executor, dagbag=dagbag) except Exception as e: self.logger.exception(e) diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 898cc04991eb9..ecfa64635ef85 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -23,6 +23,8 @@ from airflow.models import DAG from airflow.operators import DummyOperator, PythonOperator, SubDagOperator from airflow.utils.trigger_rule import TriggerRule +import time + DEFAULT_DATE = datetime(2016, 1, 1) default_args = dict( start_date=DEFAULT_DATE, @@ -31,6 +33,16 @@ def fail(): raise ValueError('Expected failure.') +def delayed_fail(): + """ + Delayed failure to make sure that processes are running before the error + is raised. + + TODO handle more directly (without sleeping) + """ + time.sleep(5) + raise ValueError('Expected failure.') + # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it dag1 = DAG(dag_id='test_backfill_pooled_task_dag', default_args=default_args) @@ -123,7 +135,9 @@ def fail(): end_date=DEFAULT_DATE, default_args=default_args) dag8_task1 = PythonOperator( - python_callable=fail, + # use delayed_fail because otherwise LocalExecutor will have a chance to + # complete the task + python_callable=delayed_fail, task_id='test_queued_task', dag=dag8, pool='test_queued_pool') diff --git a/tests/jobs.py b/tests/jobs.py index bc815e8145287..6802aae504723 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -23,11 +23,13 @@ from airflow import AirflowException, settings from airflow.bin import cli +from airflow.executors import DEFAULT_EXECUTOR from airflow.jobs import BackfillJob, SchedulerJob -from airflow.models import DagBag, DagRun, Pool, TaskInstance as TI +from airflow.models import DAG, DagBag, DagRun, Pool, TaskInstance as TI +from airflow.operators import DummyOperator +from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout -from airflow.utils.db import provide_session from airflow import configuration configuration.test_mode() @@ -283,15 +285,25 @@ def test_scheduler_pooled_tasks(self): dag = self.dagbag.get_dag(dag_id) dag.clear() - scheduler = SchedulerJob(dag_id, num_runs=10) + scheduler = SchedulerJob(dag_id, num_runs=1) scheduler.run() task_1 = dag.tasks[0] logging.info("Trying to find task {}".format(task_1)) ti = TI(task_1, dag.start_date) ti.refresh_from_db() - self.assertEqual(ti.state, State.FAILED) + self.assertEqual(ti.state, State.QUEUED) + # now we use a DIFFERENT scheduler and executor + # to simulate the num-runs CLI arg + scheduler2 = SchedulerJob( + dag_id, + num_runs=5, + executor=DEFAULT_EXECUTOR.__class__()) + scheduler2.run() + + ti.refresh_from_db() + self.assertEqual(ti.state, State.FAILED) dag.clear() def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):