Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-69] Make backfill use dagruns #1667

Closed
wants to merge 1 commit into from

Conversation

bolkedebruin
Copy link
Contributor

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:

Testing Done:

  • Updated unittests; backfill examples actually get fired off now (they did not before)
  • Separated off the example_trigger_controller_dag and tested its functionality.

Backfill jobs create taskinstances without any associated
DagRuns. This creates consistency errors. This patch addresses
this issue and also makes the scheduler backfill aware.

It doesn't deal with the remaining issue that backfills can be
scheduled on top of existing dag runs and that due to this
TaskInstances can point to multiple DagRuns
(this is the case now as well)

@plypaul @jlowin @aoen please give it your thoughts

@bolkedebruin bolkedebruin force-pushed the backfill_dagrun branch 3 times, most recently from 5bc21c9 to 14f88b1 Compare July 15, 2016 07:57
# make sure backfills are also considered
last_run = dag.get_last_dagrun(session=session)
if last_run:
if next_run_date:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try this:

    if last_run and next_run_date:
        ...

@codecov-io
Copy link

codecov-io commented Jul 15, 2016

Current coverage is 64.72% (diff: 82.14%)

Merging #1667 into master will increase coverage by 0.31%

@@             master      #1667   diff @@
==========================================
  Files           126        127     +1   
  Lines          9407       9481    +74   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
+ Hits           6059       6137    +78   
+ Misses         3348       3344     -4   
  Partials          0          0          

Powered by Codecov. Last update 2c3d0fd...13c452f

tasks_to_run[ti.key] = ti
session.merge(ti)
session.commit()
active_dag_runs = []
Copy link

@jgao54 jgao54 Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should backfill job honor a dag's max_active_runs? for subdag it wouldn't make much sense to honor it since it is part of a dag; Would it matter for backfilling dags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure. Backfills run locally in the process of the user executing it. So as long as backfills are not really scheduled, I would say no. What are your thoughts @r39132 @aoen @jlowin ?

Copy link
Contributor

@aoen aoen Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think backfill should honor max active dags because e.g. max active dag runs is one way of limiting concurrency to external resources (pools being another). In my dependency refactor PR this has been made the case. For subdags the max dag runs should be unlimited by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that makes sense. Im not sure if I know at the time of running whether we are a subdagoperator or not, but lets find out. Any suggestions how to report this back to the user? Just skip it?

On a side note @aoen , I probably can make backfill use the scheduler quite easily. However the side effect will be that backfill dont necessarily run at their point of origin. Is that a concern for you guys?

Copy link
Contributor

@aoen aoen Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subdag operator passing a flag to the backfill command would be one way to do it. Another would be to make the subdag operator mutate the subdag's max active dag runs. Personally I think subdags should be a first class citizen in airflow (as opposed to being an operator that shells out to run a separate airflow CLI command), but that is for another time.

I think that making the backfill use the scheduler is definitely the place we want to end up. One problem I can see for existing airflow use cases is that it would make it hard to iterate on/test dags since you could not pass a local copy of a dag into a backfill. Once this problem is solved I can't think of any other blockers (at least for AIrbnb). Maybe the backfill command could take an optional DAG folder argument which it would send to the workers somehow, actually now that I think about this would become easy to do in the upcoming git time-machine model (users could push their dags to a branch and then specify the branch in the backfill command).

Note that this end-goal can already be achieved by using the same executor in the airflow.cfg on the backfill machine as the scheduler uses.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aoen the 'git time-machine model' is very interesting! 👍

@jgao54
Copy link

jgao54 commented Jul 20, 2016

@bolkedebruin made 2 comments, everything else lgtm :)

@bolkedebruin bolkedebruin force-pushed the backfill_dagrun branch 3 times, most recently from 9afdbc5 to c5ef2ae Compare July 22, 2016 11:14
@bolkedebruin
Copy link
Contributor Author

@jgao54 @aoen adjusted how it works now. max_dag_run is respected now.

@jgao54
Copy link

jgao54 commented Jul 23, 2016

@bolkedebruin nice catch on filtering out 'backfill_' jobs in the scheduler.

A minor concern I have is that although max_active_run is respected by backfill, backfill itself can still create more active dagruns than max_active_run.

@aoen mentioned that end goal is to make the backfill use the scheduler. How I interpreted it: instead of having backfill do everything locally and independently (the way it is now), backfillJob would instead only create dagruns, and scheduleJob (which is the scheduler) would pick them up from db along with other scheduled dagruns to process all of them in a consistent manner. I am not sure if this interpretation is correct, but if it is, then it may be beyond the scope of this PR anyways. So I am content =P

@bolkedebruin
Copy link
Contributor Author

@jgao54 good point. Further considering the max amount of dag runs in backfills creates a bit of an issue. As the BackfillJob waits on all runs to finish before it exits should I then wait for active_dag_runs < max_dag_runs which can be arbitrarily long?

BackfillJobs should indeed be scheduled, but although the change to the backfilljob logic is relatively small, all tests need to be updated. So that will take a bit longer and is outside the scope of this PR

@jgao54
Copy link

jgao54 commented Jul 23, 2016

@bolkedebruin yeah, agree that without using the scheduler, starvation is theoretically possible with backfills.

I personally think that is an okay temporary sacrifice in order to respect max_active_run, since in most cases a dag only has a single dagrun running, if at all (and the default config for max_active_run is 16).

@jgao54
Copy link

jgao54 commented Jul 31, 2016

@bolkedebruin just coming back to this PR again... since the existing backfill doesn't scope max active runs anyways, I am convince that it's more valuable to merge this commit for now and refactor scheduler in a separate PR

@bolkedebruin
Copy link
Contributor Author

@aoen @plypaul please review this PR to see if it is commitable. Thanks!

@bolkedebruin bolkedebruin force-pushed the backfill_dagrun branch 3 times, most recently from 0704498 to 9606066 Compare August 6, 2016 16:37
@bolkedebruin
Copy link
Contributor Author

@plypaul @aoen updated to work with @plypaul patches, tests fully pass here: https://travis-ci.org/bolkedebruin/airflow

Can you please verify? Thanks!

tasks_to_run.pop(key)
session.commit()
continue
# set requiered transient field
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@plypaul
Copy link
Contributor

plypaul commented Aug 8, 2016

One thing that I realized with the backfill is that if we create DAG runs, the current scheduler will pickup tasks and send them to the executor in this call:

https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L892

To avoid that, we could check the state of the DAG run in that function and make sure that it's not a backfill.

@jlowin
Copy link
Member

jlowin commented Aug 9, 2016

Sorry to be joining the conversation so late. I've been testing the PR and as long as I have a scheduler running it works great. However, when there is no scheduler running, I get weird behavior: the backfill quickly iterates through all requested DagRuns and claims they are successful when in reality, the DagRuns aren't being created (they don't seem to be in the database) and the TaskInstances aren't either.

Here's the log output of backfilling one of the example DAGs without a scheduler running:

airflow backfill example_bash_operator -s 2016-08-01 -e 2016-08-08                                              21:17:58
[2016-08-08 21:20:16,995] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:20:17,293] {models.py:162} INFO - Filling up the DagBag from /Users/jlowin/airflow/dags
[2016-08-08 21:20:17,456] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:18,398] {jobs.py:1833} INFO - [backfill progress] | dag run 1 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:18,398] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:19,399] {jobs.py:1833} INFO - [backfill progress] | dag run 2 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:19,399] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:20,399] {jobs.py:1833} INFO - [backfill progress] | dag run 3 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:20,400] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:21,405] {jobs.py:1833} INFO - [backfill progress] | dag run 4 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:21,405] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:22,409] {jobs.py:1833} INFO - [backfill progress] | dag run 5 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:22,410] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:23,415] {jobs.py:1833} INFO - [backfill progress] | dag run 6 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:23,415] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:24,415] {jobs.py:1833} INFO - [backfill progress] | dag run 7 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:24,415] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:20:25,421] {jobs.py:1833} INFO - [backfill progress] | dag run 8 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:20:25,424] {models.py:3728} INFO - Updating state for <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False> considering 6 task(s)
[2016-08-08 21:20:25,426] {models.py:3770} INFO - Marking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False> successful
[2016-08-08 21:20:25,428] {jobs.py:1865} INFO - Backfill done. Exiting.

@jlowin
Copy link
Member

jlowin commented Aug 9, 2016

Update: maybe I'm doing something wrong, because now no matter what I do I get the above behavior (with the exception of the first DagRun, that one executes it's tasks correctly). Am I missing something?

# - delete airflow.db to start fresh
# - dev/airflow-pr work_local 1667 to clone PR
airflow initdb
airflow scheduler
airflow backfill example_bash_operator -s 2016-08-01 -e 2016-08-08

output:

airflow backfill example_bash_operator -s 2016-08-01 -e 2016-08-08                                              21:26:07
[2016-08-08 21:27:25,358] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:27:25,657] {models.py:162} INFO - Filling up the DagBag from /Users/jlowin/airflow/dags
[2016-08-08 21:27:25,817] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:27:25,828] {base_executor.py:50} INFO - Adding to queue: airflow run example_bash_operator runme_1 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:25,830] {base_executor.py:50} INFO - Adding to queue: airflow run example_bash_operator runme_0 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:25,833] {base_executor.py:50} INFO - Adding to queue: airflow run example_bash_operator also_run_this 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:25,836] {base_executor.py:50} INFO - Adding to queue: airflow run example_bash_operator runme_2 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:26,768] {sequential_executor.py:40} INFO - Executing command: airflow run example_bash_operator runme_1 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:27,547] {__init__.py:50} INFO - Using executor SequentialExecutor
Logging into: /Users/jlowin/airflow/logs/example_bash_operator/runme_1/2016-08-01T00:00:00
[2016-08-08 21:27:28,714] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:27:31,115] {sequential_executor.py:40} INFO - Executing command: airflow run example_bash_operator runme_0 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:31,885] {__init__.py:50} INFO - Using executor SequentialExecutor
Logging into: /Users/jlowin/airflow/logs/example_bash_operator/runme_0/2016-08-01T00:00:00
[2016-08-08 21:27:33,047] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:27:35,447] {sequential_executor.py:40} INFO - Executing command: airflow run example_bash_operator runme_2 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:36,209] {__init__.py:50} INFO - Using executor SequentialExecutor
Logging into: /Users/jlowin/airflow/logs/example_bash_operator/runme_2/2016-08-01T00:00:00
[2016-08-08 21:27:37,381] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:27:39,784] {sequential_executor.py:40} INFO - Executing command: airflow run example_bash_operator also_run_this 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:40,551] {__init__.py:50} INFO - Using executor SequentialExecutor
Logging into: /Users/jlowin/airflow/logs/example_bash_operator/also_run_this/2016-08-01T00:00:00
[2016-08-08 21:27:41,725] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:27:43,118] {jobs.py:1736} INFO - Executor state: success task <TaskInstance: example_bash_operator.runme_1 2016-08-01 00:00:00 [success]>
[2016-08-08 21:27:43,118] {jobs.py:1767} INFO - Task instance ('example_bash_operator', 'runme_1', datetime.datetime(2016, 8, 1, 0, 0)) succeeded
[2016-08-08 21:27:43,121] {jobs.py:1736} INFO - Executor state: success task <TaskInstance: example_bash_operator.runme_0 2016-08-01 00:00:00 [success]>
[2016-08-08 21:27:43,121] {jobs.py:1767} INFO - Task instance ('example_bash_operator', 'runme_0', datetime.datetime(2016, 8, 1, 0, 0)) succeeded
[2016-08-08 21:27:43,123] {jobs.py:1736} INFO - Executor state: success task <TaskInstance: example_bash_operator.also_run_this 2016-08-01 00:00:00 [success]>
[2016-08-08 21:27:43,123] {jobs.py:1767} INFO - Task instance ('example_bash_operator', 'also_run_this', datetime.datetime(2016, 8, 1, 0, 0)) succeeded
[2016-08-08 21:27:43,126] {jobs.py:1736} INFO - Executor state: success task <TaskInstance: example_bash_operator.runme_2 2016-08-01 00:00:00 [success]>
[2016-08-08 21:27:43,126] {jobs.py:1767} INFO - Task instance ('example_bash_operator', 'runme_2', datetime.datetime(2016, 8, 1, 0, 0)) succeeded
[2016-08-08 21:27:43,126] {jobs.py:1833} INFO - [backfill progress] | dag run 1 of 8 | tasks waiting: 2 | succeeded: 4 | kicked_off: 4 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:27:43,139] {base_executor.py:50} INFO - Adding to queue: airflow run example_bash_operator run_after_loop 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:43,153] {sequential_executor.py:40} INFO - Executing command: airflow run example_bash_operator run_after_loop 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:43,921] {__init__.py:50} INFO - Using executor SequentialExecutor
Logging into: /Users/jlowin/airflow/logs/example_bash_operator/run_after_loop/2016-08-01T00:00:00
[2016-08-08 21:27:45,084] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:27:46,486] {jobs.py:1736} INFO - Executor state: success task <TaskInstance: example_bash_operator.run_after_loop 2016-08-01 00:00:00 [success]>
[2016-08-08 21:27:46,487] {jobs.py:1767} INFO - Task instance ('example_bash_operator', 'run_after_loop', datetime.datetime(2016, 8, 1, 0, 0)) succeeded
[2016-08-08 21:27:46,487] {jobs.py:1833} INFO - [backfill progress] | dag run 1 of 8 | tasks waiting: 1 | succeeded: 5 | kicked_off: 5 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:27:46,498] {base_executor.py:50} INFO - Adding to queue: airflow run example_bash_operator run_this_last 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:46,505] {sequential_executor.py:40} INFO - Executing command: airflow run example_bash_operator run_this_last 2016-08-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_bash_operator.py
[2016-08-08 21:27:47,269] {__init__.py:50} INFO - Using executor SequentialExecutor
Logging into: /Users/jlowin/airflow/logs/example_bash_operator/run_this_last/2016-08-01T00:00:00
[2016-08-08 21:27:48,430] {__init__.py:50} INFO - Using executor SequentialExecutor
[2016-08-08 21:27:49,830] {jobs.py:1736} INFO - Executor state: success task <TaskInstance: example_bash_operator.run_this_last 2016-08-01 00:00:00 [success]>
[2016-08-08 21:27:49,830] {jobs.py:1767} INFO - Task instance ('example_bash_operator', 'run_this_last', datetime.datetime(2016, 8, 1, 0, 0)) succeeded
[2016-08-08 21:27:49,830] {jobs.py:1833} INFO - [backfill progress] | dag run 1 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 6 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:27:49,830] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:27:49,851] {jobs.py:1833} INFO - [backfill progress] | dag run 2 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 6 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:27:49,852] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:27:50,853] {jobs.py:1833} INFO - [backfill progress] | dag run 3 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 6 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:27:50,853] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:27:51,859] {jobs.py:1833} INFO - [backfill progress] | dag run 4 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 6 | failed: 0 | skipped: 0 | deadlocked: 0
[2016-08-08 21:27:51,859] {jobs.py:1664} INFO - Checking run <DagRun example_bash_operator @ 2016-08-01 00:00:00: scheduled__2016-08-01T00:00:00, externally triggered: False>
[2016-08-08 21:27:52,859] {jobs.py:1833} INFO - [backfill progress] | dag run 5 of 8 | tasks waiting: 0 | succeeded: 6 | kicked_off: 6 | failed: 0 | skipped: 0 | deadlocked: 0
# ...etc...

Now I feel like maybe I'm making a mistake somewhere??

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Aug 9, 2016

@plypaul I see. I need to look at that function because it uses TaskInstances as a basis which I would like to rework to a DagRun basis. To make it work now I would need to add a ti.get_dag_run method, which is not guaranteed to be unique.

@jlowin fixed the issue good catch. I will add a test for that. Can you retest?

@bolkedebruin bolkedebruin force-pushed the backfill_dagrun branch 2 times, most recently from ffdaace to 92f50fb Compare August 9, 2016 20:30
@bolkedebruin
Copy link
Contributor Author

@plypaul @jlowin Tests added, issues fixed.

Backfill jobs create taskinstances without any associated
DagRuns. This creates consistency errors. This patch addresses
this issue and also makes the scheduler backfill aware.

The scheduler makes sure to schedule new dag runs after the
last dag run including backfills. It will not pick up
any tasks that are part of a backfill as those are considered
to be managed by the backfill process. This can be (and should
be) changed when backfill are running in a scheduled fashion.

It doesn't deal with the remaining issue that backfills can be
scheduled on top of existing dag runs and that due to this
TaskInstances can point to multiple DagRuns.
@jlowin
Copy link
Member

jlowin commented Aug 10, 2016

@bolkedebruin looking good now! I can't tell because of the rebase, what did you change to fix it?

@bolkedebruin
Copy link
Contributor Author

@jlowin considered next run date instead of just the start date and made sure the dag run state was checked for every dag run iteration

@jlowin
Copy link
Member

jlowin commented Aug 11, 2016

@bolkedebruin I think this is in good shape. +1 from me.

I accidentally bumped into the "max active runs" issue by testing the example_passing_params_via_test_command DAG over 8 days (same code as above)... turns out it has a schedule_interval of one minutes, so I immediately had 400 active DagRuns. Not sure if we want to address that as a core issue here (I know there was some discussion a while ago) but it's the only thing I'd flag as needing handling at some point (maybe a future PR).

@bolkedebruin
Copy link
Contributor Author

@jlowin thanks. I would like to leave the concurrency bit for when i move the backfill to a version that uses the scheduler. That should simplify things.

Can you merge it for me?

@plypaul
Copy link
Contributor

plypaul commented Aug 12, 2016

@bolkedebruin To clarify, it's possible to have multiple DAG runs for the same (dag_id, execution_date, start_date)?

@asfgit asfgit closed this in c39e4f6 Aug 12, 2016
@jlowin
Copy link
Member

jlowin commented Aug 12, 2016

@bolkedebruin merged!

@bolkedebruin
Copy link
Contributor Author

@jlowin thanks!

@plypaul it is. However if a previous dagrun exists this dagrun will be used. In case of a scheduled dagrun it will not be rewritten to a backfill run-id. This will need to be addressed separately. Probably some discussion is required on the approach. Does this answer your question?

alekstorm pushed a commit to alekstorm/incubator-airflow that referenced this pull request Jun 1, 2017
Backfill jobs create taskinstances without any associated
DagRuns. This creates consistency errors. This patch addresses
this issue and also makes the scheduler backfill aware.

The scheduler makes sure to schedule new dag runs after the
last dag run including backfills. It will not pick up
any tasks that are part of a backfill as those are considered
to be managed by the backfill process. This can be (and should
be) changed when backfill are running in a scheduled fashion.

It doesn't deal with the remaining issue that backfills can be
scheduled on top of existing dag runs and that due to this
TaskInstances can point to multiple DagRuns.

Closes apache#1667 from bolkedebruin/backfill_dagrun
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants