Skip to content

Commit

Permalink
[AIRFLOW-1837] Respect task start_date when different from dag's (apa…
Browse files Browse the repository at this point in the history
…che#4010)

Currently task instances get created and scheduled based on the DAG's
start date rather than their own.  This commit adds a check before
creating a task instance to see that the start date is not after
the execution date.
  • Loading branch information
dima-asana authored and ashb committed Oct 22, 2018
1 parent 2ad664d commit b81a3c6
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 30 deletions.
2 changes: 2 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5153,6 +5153,8 @@ def verify_integrity(self, session=None):
for task in six.itervalues(dag.task_dict):
if task.adhoc:
continue
if task.start_date > self.execution_date and not self.is_backfill:
continue

if task.task_id not in task_ids:
ti = TaskInstance(task, self.execution_date)
Expand Down
29 changes: 15 additions & 14 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from airflow.utils import timezone
from airflow.utils.timezone import datetime
from airflow.utils.state import State
from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
from airflow.utils.dates import days_ago, infer_time_unit, round_time, scale_time_units
from lxml import html
from airflow.exceptions import AirflowException
from airflow.configuration import AirflowConfigException, run_command
Expand All @@ -81,6 +81,7 @@
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
EXAMPLE_DAG_DEFAULT_DATE = days_ago(2)

try:
import cPickle as pickle
Expand Down Expand Up @@ -1651,21 +1652,21 @@ def setUp(self):

self.dagrun_python = self.dag_python.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)

self.sub_dag.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)

self.example_xcom.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)
Expand Down Expand Up @@ -1758,7 +1759,7 @@ def test_dag_views(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=runme_0&dag_id=example_bash_operator&'
'execution_date={}'.format(DEFAULT_DATE_DS))
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Attributes", response.data.decode('utf-8'))
response = self.app.get(
'/admin/airflow/dag_stats')
Expand All @@ -1770,22 +1771,21 @@ def test_dag_views(self):
"/admin/airflow/success?task_id=print_the_context&"
"dag_id=example_python_operator&upstream=false&downstream=false&"
"future=false&past=false&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
response = self.app.get(url + "&confirmed=true")
response = self.app.get(
'/admin/airflow/clear?task_id=print_the_context&'
'dag_id=example_python_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date={}&'
'origin=/admin'.format(DEFAULT_DATE_DS))
'origin=/admin'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Wait a minute", response.data.decode('utf-8'))
url = (
"/admin/airflow/success?task_id=section-1&"
"dag_id=example_subdag_operator&upstream=true&downstream=true&"
"future=false&past=false&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("section-1-task-1", response.data.decode('utf-8'))
Expand All @@ -1799,7 +1799,7 @@ def test_dag_views(self):
"dag_id=example_python_operator&future=false&past=false&"
"upstream=false&downstream=true&"
"execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
response = self.app.get(url + "&confirmed=true")
Expand All @@ -1808,7 +1808,7 @@ def test_dag_views(self):
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
"upstream=false&downstream=true&recursive=true&"
"execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("example_subdag_operator.end",
Expand All @@ -1835,7 +1835,7 @@ def test_dag_views(self):
"/admin/airflow/run?task_id=runme_0&"
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
"ignore_task_deps=true&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
response = self.app.get(
"/admin/airflow/refresh?dag_id=example_bash_operator")
Expand Down Expand Up @@ -1870,13 +1870,14 @@ def test_fetch_task_instance(self):
url = (
"/admin/airflow/object/task_instances?"
"dag_id=example_python_operator&"
"execution_date={}".format(DEFAULT_DATE_DS))
"execution_date={}".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("print_the_context", response.data.decode('utf-8'))

def tearDown(self):
configuration.conf.set("webserver", "expose_config", "False")
self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
self.dag_bash.clear(start_date=EXAMPLE_DAG_DEFAULT_DATE,
end_date=timezone.utcnow())
session = Session()
session.query(models.DagRun).delete()
session.query(models.TaskInstance).delete()
Expand Down
22 changes: 19 additions & 3 deletions tests/dags/test_scheduler_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,34 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime
from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
DEFAULT_DATE = datetime(2100, 1, 1)
DEFAULT_DATE = datetime(2016, 1, 1)

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(
dag_id='test_start_date_scheduling',
start_date=datetime(2100, 1, 1))
start_date=datetime.utcnow() + timedelta(days=1))
dag1_task1 = DummyOperator(
task_id='dummy',
dag=dag1,
owner='airflow')

dag2 = DAG(
dag_id='test_task_start_date_scheduling',
start_date=DEFAULT_DATE
)
dag2_task1 = DummyOperator(
task_id='dummy1',
dag=dag2,
owner='airflow',
start_date=DEFAULT_DATE + timedelta(days=3)
)
dag2_task2 = DummyOperator(
task_id='dummy2',
dag=dag2,
owner='airflow'
)
23 changes: 22 additions & 1 deletion tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,7 @@ def test_scheduler_start_date(self):
dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
self.assertTrue(dag.start_date > DEFAULT_DATE)
self.assertTrue(dag.start_date > datetime.datetime.utcnow())

scheduler = SchedulerJob(dag_id,
num_runs=2)
Expand Down Expand Up @@ -2244,6 +2244,27 @@ def test_scheduler_start_date(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)

def test_scheduler_task_start_date(self):
"""
Test that the scheduler respects task start dates that are different
from DAG start dates
"""
dag_id = 'test_task_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
scheduler = SchedulerJob(dag_id,
num_runs=2)
scheduler.run()

session = settings.Session()
tiq = session.query(TI).filter(TI.dag_id == dag_id)
ti1s = tiq.filter(TI.task_id == 'dummy1').all()
ti2s = tiq.filter(TI.task_id == 'dummy2').all()
self.assertEqual(len(ti1s), 0)
self.assertEqual(len(ti2s), 2)
for t in ti2s:
self.assertEqual(t.state, State.SUCCESS)

def test_scheduler_multiprocessing(self):
"""
Test that the scheduler can successfully queue multiple dags in parallel
Expand Down
24 changes: 12 additions & 12 deletions tests/www_rbac/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils import dates, timezone
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.www_rbac import app as application
Expand Down Expand Up @@ -263,8 +263,8 @@ def test_mount(self):


class TestAirflowBaseViews(TestBase):
default_date = timezone.datetime(2018, 3, 1)
run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))

def setUp(self):
super(TestAirflowBaseViews, self).setUp()
Expand All @@ -291,19 +291,19 @@ def prepare_dagruns(self):

self.bash_dagrun = self.bash_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.sub_dagrun = self.sub_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.xcom_dagrun = self.xcom_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

Expand All @@ -321,19 +321,19 @@ def test_home(self):

def test_task(self):
url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Task Instance Details', resp)

def test_xcom(self):
url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('XCom', resp)

def test_rendered(self):
url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Rendered Template', resp)

Expand Down Expand Up @@ -404,21 +404,21 @@ def test_success(self):

url = ('success?task_id=run_this_last&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('Wait a minute', resp)

def test_clear(self):
url = ('clear?task_id=runme_1&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp)

def test_run(self):
url = ('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&'
'ignore_ti_state=true&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('', resp, resp_code=302)

Expand Down

0 comments on commit b81a3c6

Please sign in to comment.