Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-160] Parse DAG files through child processes #1559

Conversation

plypaul
Copy link
Contributor

@plypaul plypaul commented May 31, 2016

This preliminary PR addresses the following issue:
https://issues.apache.org/jira/browse/AIRFLOW-160

As referenced in the JIRA issue, this PR aims to parse user DAG files in child processes to make the scheduler more robust. The main change occurs in the SchedulerJob._execute(). The overall flow for scheduling is:

In the main process

  • Scan the DAG directory and create a list of paths to potential DAG definition files
  • Create child processes, a new one for each file but only N processes at once

In the child process

  • Figure out the task instances that should be run for DAGs in the file
  • Create those task instances in the ORM with the QUEUEDstate

In the main process

  • As processes finish, query ORM for task instances in the QUEUED state
  • Prioritize task instances and send them to the executor
  • Repeat continuously for the specified duration

Some more notes:

  • The DagFileProcessor class creates the child process.
  • Due to serialization issues, a normal DAG object wasn't passed from the child process to the main process. Instead, a simplified DAG object with the fields necessary for scheduling were passed instead.
  • A DAG file that's slow to parse / schedule doesn't block other DAG files from being scheduled (provided there is sufficient parallelism)
  • There is a user specified limit on how frequently DAG files should be parsed.
  • Tasks have to be sent to the executor from the main process as it's necessary to coordinate pools and prioritization of task instances.
  • Removed the option in DagBag to sync to DB. Instead, DAGs are sync'ed to the DB in a separate call.

Example log output:

[2016-05-31 06:53:11,048] {jobs.py:1132} INFO - Searching for files in /home/paul_yang/airflow/dags
[2016-05-31 06:53:11,048] {jobs.py:1135} INFO - There are 2 files in /home/paul_yang/airflow/dags
[2016-05-31 06:53:11,048] {jobs.py:1179} INFO - Heartbeating the process manager
[2016-05-31 06:53:11,051] {scheduler.py:562} INFO - Started a process (PID: 9612) to generate tasks for /home/paul_yang/airflow/dags/tutorial1.py - logging into /tmp/airflow/scheduler/logs/2016-05-31/tutorial1.py.log
[2016-05-31 06:53:11,054] {scheduler.py:562} INFO - Started a process (PID: 9613) to generate tasks for /home/paul_yang/airflow/dags/tutorial2.py - logging into /tmp/airflow/scheduler/logs/2016-05-31/tutorial2.py.log
[2016-05-31 06:53:11,054] {jobs.py:1189} INFO - Heartbeating the executor
[2016-05-31 06:53:11,055] {jobs.py:1061} INFO -
================================================================================
DAG File Processing Stats

File Path                                    PID  Runtime      Last Runtime    Last Run
-----------------------------------------  -----  ---------  --------------  ----------
/home/paul_yang/airflow/dags/tutorial1.py   9612  0.00s
/home/paul_yang/airflow/dags/tutorial2.py   9613  0.00s
================================================================================
[2016-05-31 06:53:12,056] {jobs.py:1179} INFO - Heartbeating the process manager
[2016-05-31 06:53:12,057] {scheduler.py:501} INFO - Processor for /home/paul_yang/airflow/dags/tutorial1.py finished
[2016-05-31 06:53:12,062] {jobs.py:848} INFO - Queued tasks up for execution:
    <TaskInstance: paul_tutorial1.print_date 2016-01-01 01:30:00 [queued]>
[2016-05-31 06:53:12,064] {jobs.py:871} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 1 task instances in queue
[2016-05-31 06:53:12,065] {jobs.py:914} INFO - DAG paul_tutorial1 has 0/16 running tasks
[2016-05-31 06:53:12,065] {jobs.py:938} INFO - Sending to executor (u'paul_tutorial1', u'print_date', datetime.datetime(2016, 1, 1, 1, 30)) with priority 1 and queue default

@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch from 3f47717 to 3d95a79 Compare May 31, 2016 07:29
@@ -459,39 +700,24 @@ def schedule_dag(self, dag):
session.commit()
return next_run

def process_dag(self, dag, queue):
def create_task_instances(self, dag, queue):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be called process_task_instances, task_instances are eagerly created thus the name does not reflect what is happening here

@bolkedebruin
Copy link
Contributor

I do have some general remarks that might be smarter to discuss in person to align. My concerns lie in the fact that DagRuns do not seem to be really used and it seems to rely on TaskInstances. Also some divergence with the work I and @jlowin have been doing from a first review.

(I do really like spinning of the dag handling in separate processes, dont get me wrong!)

ti = TI(task, ti_key[2])
# Task starts out in the queued state. All tasks in the queued
# state will be scheduled later in the execution loop.
ti.state = State.QUEUED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big change that alters the semantics of QUEUED. There may be places in the code that check for State.NONE (or state is None) that could break.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I wonder what the decision is to start using QUEUD for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planned to define QUEUED as the state when a task instance needs to be sent to the executor (or has been sent already). What are the use cases for the NONE state? I searched for references to the NONE state, but didn't have much luck.

Copy link
Contributor

@criccomini criccomini Jun 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@plypaul might have a look at https://cwiki.apache.org/confluence/display/AIRFLOW/Scheduler+Basics .. @bolkedebruin took a shot at documenting the states a bit. If things are insufficient on that doc, we should update it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm lacking clarity around the NONE state and its purpose. One other strange thing that I saw was how task instances with pools get created in the NONE state, get sent to the executor, runs, and then it puts itself in the QUEUED state. See:

https://github.com/apache/incubator-airflow/blob/c2384cb41b4a10ad97124fe9a92df0a376a208c2/airflow/models.py#L1236

After that, it gets run again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pool stuff is pretty jacked. See this JIRA for a related discussion:

https://issues.apache.org/jira/browse/AIRFLOW-205

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(A quick overview of task states. This isn't necessarily how things should be but it is how they are :) )

Indeed -- all tasks used to start with State.NONE. @bolkedebruin's latest PR introduces State.SCHEDULED when the TI has been created by the scheduler but not yet run. When the executor loads a TI, it eventually calls TI.run() which does one of two things:

  1. If the TI can be run, sets State.RUNNING and calls the execute() method.
  2. If the TI has a pool, sets State.QUEUED and returns
    The scheduler has a prioritize_queued method which loads up all the queued tasks and tries to run them if there are slots available in their respective pools. That second run is the one that actually moves pooled tasks to State.RUNNING.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SCHEDULED only gets set when the scheduler does a handover to the executor. It only does that for tis that were NONE before. See also the wiki.

@jlowin
Copy link
Member

jlowin commented May 31, 2016

A lot of good work here, thanks @plypaul. It does need some realignment with the current push toward DagRun centralization but I think a lot of the core ideas are compatible.

@@ -776,10 +795,10 @@ class CLIFactory(object):
default=False),
# scheduler
'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
'num_runs': Arg(
("-n", "--num_runs"),
'run_duration': Arg(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commands that used to not fail will fail now, we'll need to put that in the header of the next release notes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could re-add this feature for now, and potentially remove for the next major version?

@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch 3 times, most recently from c15982c to 43cb91a Compare June 2, 2016 23:02
@criccomini
Copy link
Contributor

Does this PR have sufficient test coverage?

@plypaul
Copy link
Contributor Author

plypaul commented Jun 2, 2016

The tests need some work at the moment - I will update when they're in a better state.

@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch 2 times, most recently from a595c10 to c6030c8 Compare June 3, 2016 02:55
class DagFileProcessor(AbstractDagFileProcessor):
"""Helps call SchedulerJob.process_file() in a separate process."""

# Counter that increments every an instance of this class is created
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every time

@bolkedebruin
Copy link
Contributor

Great work! All in all I do prefer a DagRun based approach and not using the QUEUED state here but rather SCHEDULED when a handover to the executor occurs. Additionally, I do think the shallow objects need to have a real relationship with the full objects otherwise we will end up with code maintenance issues. If we could get rid of the shallow objects that would even be better.

@criccomini
Copy link
Contributor

Landscape shows 58 new PEP issues, and there appear to be some sketchy non-determinstic failures from travis:

======================================================================
FAIL: Test that the scheduler handles queued tasks correctly
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/apache/incubator-airflow/tests/jobs.py", line 312, in test_scheduler_pooled_tasks
    self.assertEqual(ti.state, State.FAILED)
AssertionError: 'queued' != u'failed'

@bolkedebruin
Copy link
Contributor

@plypaul the error @criccomini is seeing with tests does increase my concerns with the changed semantics of "QUEUED". In addition, the tests need to be extended to be testing the new functionality you are creating not just updating the old tests. For instance I would like to see a test that has a faulty dag and see how it recovers from that.

@plypaul
Copy link
Contributor Author

plypaul commented Jun 3, 2016

Yeah, testing is in progress, and I'll post a comment once those are updated.

@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch 3 times, most recently from ccb5f64 to 79a1159 Compare June 3, 2016 23:39
@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch 7 times, most recently from 5db183e to aae2ba8 Compare June 17, 2016 03:38
self.do_pickle = do_pickle
super(SchedulerJob, self).__init__(*args, **kwargs)

self.logger.error("Executor is {}".format(self.executor.__class__))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.info

@bolkedebruin
Copy link
Contributor

@plypaul No I actually mean non-serialized. The dag files in itself are not so big and we are not pickling for state right? So that raises the question why serialize at all?


dag_folder = dag_folder or DAGS_FOLDER
self.logger.error("Using dag_folder {}".format(dag_folder))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.info

@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch 3 times, most recently from 8808bdd to 0fc11f8 Compare June 22, 2016 04:03
@plypaul
Copy link
Contributor Author

plypaul commented Jun 22, 2016

@bolkedebruin Sending around the contents of the DAG file isn't sufficient as it could import modules that aren't available on the host.

@bolkedebruin
Copy link
Contributor

With the zip dags it is actually

@bolkedebruin
Copy link
Contributor

Cause they can contain modules

@artwr
Copy link
Contributor

artwr commented Jun 22, 2016

@bolkedebruin, isn't that the case iff the modules do not rely on external system libraries?

@bolkedebruin
Copy link
Contributor

If you mean that the modules in the zip require any native dependencies (.so) to be present on the host system then yes.

If your hosts are architecturally the same then you should be able to package almost anything into the zipped dag.

@bolkedebruin
Copy link
Contributor

We could even extend that by allowing .so in the package and use LD_PRELOAD

@bolkedebruin
Copy link
Contributor

(So you should be able to shoot numpy around)

Another way that comes to mind (sorry typing on a phone) is to use the wheel format and let pip resolve dependencies for DAGs before we execute a task. Doing that in a virtual-Env / docker will not require root access.

@bolkedebruin
Copy link
Contributor

Doing that in a virtualenv per task will add a bit of time to the startup of a task but will allow versioning of dependencies.

@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch 2 times, most recently from ace7aa2 to a389019 Compare June 23, 2016 01:32
Instead of parsing the DAG definition files in the same process as the
scheduler, this change parses the files in a child process. This helps
to isolate the scheduler from bad user code.
@plypaul plypaul force-pushed the plypaul_schedule_by_file_rebase_1.7.1 branch 2 times, most recently from 2e03b4f to 084196a Compare June 24, 2016 18:54
@plypaul plypaul closed this Jun 27, 2016
@plypaul plypaul deleted the plypaul_schedule_by_file_rebase_1.7.1 branch June 27, 2016 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants