diff --git a/airflow/models.py b/airflow/models.py index 22e8d2596a95b..775d9f63de178 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -5254,6 +5254,8 @@ def verify_integrity(self, session=None): for task in six.itervalues(dag.task_dict): if task.adhoc: continue + if task.start_date > self.execution_date and not self.is_backfill: + continue if task.task_id not in task_ids: Stats.incr( diff --git a/tests/core.py b/tests/core.py index c37b1f9c8ba31..918e9b4d49282 100644 --- a/tests/core.py +++ b/tests/core.py @@ -62,7 +62,7 @@ from airflow.utils import timezone from airflow.utils.timezone import datetime from airflow.utils.state import State -from airflow.utils.dates import infer_time_unit, round_time, scale_time_units +from airflow.utils.dates import days_ago, infer_time_unit, round_time, scale_time_units from lxml import html from airflow.exceptions import AirflowException from airflow.configuration import AirflowConfigException, run_command @@ -80,6 +80,7 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] TEST_DAG_ID = 'unit_tests' +EXAMPLE_DAG_DEFAULT_DATE = days_ago(2) try: import cPickle as pickle @@ -1805,21 +1806,21 @@ def setUp(self): self.dagrun_python = self.dag_python.create_dagrun( run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())), - execution_date=DEFAULT_DATE, + execution_date=EXAMPLE_DAG_DEFAULT_DATE, start_date=timezone.utcnow(), state=State.RUNNING ) self.sub_dag.create_dagrun( run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())), - execution_date=DEFAULT_DATE, + execution_date=EXAMPLE_DAG_DEFAULT_DATE, start_date=timezone.utcnow(), state=State.RUNNING ) self.example_xcom.create_dagrun( run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())), - execution_date=DEFAULT_DATE, + execution_date=EXAMPLE_DAG_DEFAULT_DATE, start_date=timezone.utcnow(), state=State.RUNNING ) @@ -1912,7 +1913,7 @@ def test_dag_views(self): response = self.app.get( '/admin/airflow/task?' 'task_id=runme_0&dag_id=example_bash_operator&' - 'execution_date={}'.format(DEFAULT_DATE_DS)) + 'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE)) self.assertIn("Attributes", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/dag_stats') @@ -1924,22 +1925,21 @@ def test_dag_views(self): "/admin/airflow/success?task_id=print_the_context&" "dag_id=example_python_operator&upstream=false&downstream=false&" "future=false&past=false&execution_date={}&" - "origin=/admin".format(DEFAULT_DATE_DS)) + "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE)) response = self.app.get(url) self.assertIn("Wait a minute", response.data.decode('utf-8')) - response = self.app.get(url + "&confirmed=true") response = self.app.get( '/admin/airflow/clear?task_id=print_the_context&' 'dag_id=example_python_operator&future=true&past=false&' 'upstream=true&downstream=false&' 'execution_date={}&' - 'origin=/admin'.format(DEFAULT_DATE_DS)) + 'origin=/admin'.format(EXAMPLE_DAG_DEFAULT_DATE)) self.assertIn("Wait a minute", response.data.decode('utf-8')) url = ( "/admin/airflow/success?task_id=section-1&" "dag_id=example_subdag_operator&upstream=true&downstream=true&" "future=false&past=false&execution_date={}&" - "origin=/admin".format(DEFAULT_DATE_DS)) + "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE)) response = self.app.get(url) self.assertIn("Wait a minute", response.data.decode('utf-8')) self.assertIn("section-1-task-1", response.data.decode('utf-8')) @@ -1953,7 +1953,7 @@ def test_dag_views(self): "dag_id=example_python_operator&future=false&past=false&" "upstream=false&downstream=true&" "execution_date={}&" - "origin=/admin".format(DEFAULT_DATE_DS)) + "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE)) response = self.app.get(url) self.assertIn("Wait a minute", response.data.decode('utf-8')) response = self.app.get(url + "&confirmed=true") @@ -1962,7 +1962,7 @@ def test_dag_views(self): "dag_id=example_subdag_operator.section-1&future=false&past=false&" "upstream=false&downstream=true&recursive=true&" "execution_date={}&" - "origin=/admin".format(DEFAULT_DATE_DS)) + "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE)) response = self.app.get(url) self.assertIn("Wait a minute", response.data.decode('utf-8')) self.assertIn("example_subdag_operator.end", @@ -1989,7 +1989,7 @@ def test_dag_views(self): "/admin/airflow/run?task_id=runme_0&" "dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&" "ignore_task_deps=true&execution_date={}&" - "origin=/admin".format(DEFAULT_DATE_DS)) + "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE)) response = self.app.get(url) response = self.app.get( "/admin/airflow/refresh?dag_id=example_bash_operator") @@ -2024,7 +2024,7 @@ def test_fetch_task_instance(self): url = ( "/admin/airflow/object/task_instances?" "dag_id=example_python_operator&" - "execution_date={}".format(DEFAULT_DATE_DS)) + "execution_date={}".format(EXAMPLE_DAG_DEFAULT_DATE)) response = self.app.get(url) self.assertIn("print_the_context", response.data.decode('utf-8')) @@ -2032,19 +2032,20 @@ def test_dag_view_task_with_python_operator_using_partial(self): response = self.app.get( '/admin/airflow/task?' 'task_id=test_dagrun_functool_partial&dag_id=test_task_view_type_check&' - 'execution_date={}'.format(DEFAULT_DATE_DS)) + 'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE)) self.assertIn("A function with two args", response.data.decode('utf-8')) def test_dag_view_task_with_python_operator_using_instance(self): response = self.app.get( '/admin/airflow/task?' 'task_id=test_dagrun_instance&dag_id=test_task_view_type_check&' - 'execution_date={}'.format(DEFAULT_DATE_DS)) + 'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE)) self.assertIn("A __call__ method", response.data.decode('utf-8')) def tearDown(self): configuration.conf.set("webserver", "expose_config", "False") - self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow()) + self.dag_bash.clear(start_date=EXAMPLE_DAG_DEFAULT_DATE, + end_date=timezone.utcnow()) session = Session() session.query(models.DagRun).delete() session.query(models.TaskInstance).delete() diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py index ae2bd202d9803..94e6f8216d226 100644 --- a/tests/dags/test_scheduler_dags.py +++ b/tests/dags/test_scheduler_dags.py @@ -17,18 +17,34 @@ # specific language governing permissions and limitations # under the License. -from datetime import datetime +from datetime import datetime, timedelta from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator -DEFAULT_DATE = datetime(2100, 1, 1) +DEFAULT_DATE = datetime(2016, 1, 1) # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it dag1 = DAG( dag_id='test_start_date_scheduling', - start_date=datetime(2100, 1, 1)) + start_date=datetime.utcnow() + timedelta(days=1)) dag1_task1 = DummyOperator( task_id='dummy', dag=dag1, owner='airflow') + +dag2 = DAG( + dag_id='test_task_start_date_scheduling', + start_date=DEFAULT_DATE +) +dag2_task1 = DummyOperator( + task_id='dummy1', + dag=dag2, + owner='airflow', + start_date=DEFAULT_DATE + timedelta(days=3) +) +dag2_task2 = DummyOperator( + task_id='dummy2', + dag=dag2, + owner='airflow' +) diff --git a/tests/jobs.py b/tests/jobs.py index bb714bd201602..9dcd15fbe6120 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -2217,7 +2217,7 @@ def test_scheduler_start_date(self): dag_id = 'test_start_date_scheduling' dag = self.dagbag.get_dag(dag_id) dag.clear() - self.assertTrue(dag.start_date > DEFAULT_DATE) + self.assertTrue(dag.start_date > datetime.datetime.utcnow()) scheduler = SchedulerJob(dag_id, num_runs=2) @@ -2252,6 +2252,27 @@ def test_scheduler_start_date(self): self.assertEqual( len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) + def test_scheduler_task_start_date(self): + """ + Test that the scheduler respects task start dates that are different + from DAG start dates + """ + dag_id = 'test_task_start_date_scheduling' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + scheduler = SchedulerJob(dag_id, + num_runs=2) + scheduler.run() + + session = settings.Session() + tiq = session.query(TI).filter(TI.dag_id == dag_id) + ti1s = tiq.filter(TI.task_id == 'dummy1').all() + ti2s = tiq.filter(TI.task_id == 'dummy2').all() + self.assertEqual(len(ti1s), 0) + self.assertEqual(len(ti2s), 2) + for t in ti2s: + self.assertEqual(t.state, State.SUCCESS) + def test_scheduler_multiprocessing(self): """ Test that the scheduler can successfully queue multiple dags in parallel diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py index 64531244656a4..a952b9874c088 100644 --- a/tests/www_rbac/test_views.py +++ b/tests/www_rbac/test_views.py @@ -39,7 +39,7 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session -from airflow.utils import timezone +from airflow.utils import dates, timezone from airflow.utils.state import State from airflow.utils.timezone import datetime from airflow.www_rbac import app as application @@ -267,8 +267,8 @@ def test_mount(self): class TestAirflowBaseViews(TestBase): - default_date = timezone.datetime(2018, 3, 1) - run_id = "test_{}".format(models.DagRun.id_for_date(default_date)) + EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2) + run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE)) def setUp(self): super(TestAirflowBaseViews, self).setUp() @@ -297,19 +297,19 @@ def prepare_dagruns(self): self.bash_dagrun = self.bash_dag.create_dagrun( run_id=self.run_id, - execution_date=self.default_date, + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE, start_date=timezone.utcnow(), state=State.RUNNING) self.sub_dagrun = self.sub_dag.create_dagrun( run_id=self.run_id, - execution_date=self.default_date, + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE, start_date=timezone.utcnow(), state=State.RUNNING) self.xcom_dagrun = self.xcom_dag.create_dagrun( run_id=self.run_id, - execution_date=self.default_date, + execution_date=self.EXAMPLE_DAG_DEFAULT_DATE, start_date=timezone.utcnow(), state=State.RUNNING) @@ -327,19 +327,19 @@ def test_home(self): def test_task(self): url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' - .format(self.percent_encode(self.default_date))) + .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) resp = self.client.get(url, follow_redirects=True) self.check_content_in_response('Task Instance Details', resp) def test_xcom(self): url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' - .format(self.percent_encode(self.default_date))) + .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) resp = self.client.get(url, follow_redirects=True) self.check_content_in_response('XCom', resp) def test_rendered(self): url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' - .format(self.percent_encode(self.default_date))) + .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) resp = self.client.get(url, follow_redirects=True) self.check_content_in_response('Rendered Template', resp) @@ -409,28 +409,28 @@ def test_paused(self): def test_failed(self): url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&' 'execution_date={}&upstream=false&downstream=false&future=false&past=false' - .format(self.percent_encode(self.default_date))) + .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) resp = self.client.get(url) self.check_content_in_response('Wait a minute', resp) def test_success(self): url = ('success?task_id=run_this_last&dag_id=example_bash_operator&' 'execution_date={}&upstream=false&downstream=false&future=false&past=false' - .format(self.percent_encode(self.default_date))) + .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) resp = self.client.get(url) self.check_content_in_response('Wait a minute', resp) def test_clear(self): url = ('clear?task_id=runme_1&dag_id=example_bash_operator&' 'execution_date={}&upstream=false&downstream=false&future=false&past=false' - .format(self.percent_encode(self.default_date))) + .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) resp = self.client.get(url) self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp) def test_run(self): url = ('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&' 'ignore_ti_state=true&execution_date={}' - .format(self.percent_encode(self.default_date))) + .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE))) resp = self.client.get(url) self.check_content_in_response('', resp, resp_code=302)