From 9afd0a597f1f2870b3f5966cca5fed4bfcaa882e Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Tue, 14 Sep 2021 12:33:27 +0530 Subject: [PATCH 01/11] Add start date to trigger_dagrun operator --- .../api/common/experimental/trigger_dag.py | 1 + tests/operators/test_trigger_dagrun.py | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index f82f88feeac7b..cd6c7defb193d 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -82,6 +82,7 @@ def _trigger_dag( for _dag in dags_to_trigger: trigger = _dag.create_dagrun( run_id=run_id, + start_date=timezone.utcnow(), execution_date=execution_date, state=State.RUNNING, conf=run_conf, diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 1bdc59bde8a2d..72720d1a52c7b 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -255,3 +255,34 @@ def test_trigger_dagrun_with_wait_for_completion_true_fail(self): ) with pytest.raises(AirflowException): task.run(start_date=execution_date, end_date=execution_date) + + def test_trigger_dagrun_triggering_itself(self): + """Test TriggerDagRunOperator that triggers itself""" + execution_date = DEFAULT_DATE + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id=self.dag.dag_id, + allowed_states=[State.RUNNING, State.SUCCESS], + dag=self.dag, + ) + task.run(start_date=execution_date, end_date=execution_date) + + with create_session() as session: + dagruns = session.query(DagRun).filter(DagRun.dag_id == self.dag.dag_id).all() + assert len(dagruns) == 2 + assert isinstance(dagruns[1].start_date, datetime) + + def test_trigger_dagrun_triggering_itself_with_execution_date(self): + """Test TriggerDagRunOperator that triggers itself with execution date, fails with DagRunAlreadyExists""" + execution_date = DEFAULT_DATE + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id=self.dag.dag_id, + execution_date=execution_date, + allowed_states=[State.RUNNING, State.SUCCESS], + dag=self.dag, + ) + with pytest.raises(DagRunAlreadyExists): + task.run(start_date=execution_date, end_date=execution_date) + + From f326d9ed986e59d99c594e8042f3b9b0ae61122f Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Tue, 14 Sep 2021 16:15:09 +0530 Subject: [PATCH 02/11] fix failing testcase added to mock call --- tests/api/client/test_local_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index c20a1b98112ed..8a04c741a6fd5 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -69,6 +69,7 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id) mock.assert_called_once_with( run_id=run_id, + start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, state=State.RUNNING, conf=None, @@ -81,6 +82,7 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id, execution_date=EXECDATE) mock.assert_called_once_with( run_id=run_id, + start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, state=State.RUNNING, conf=None, @@ -94,6 +96,7 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id, run_id=custom_run_id) mock.assert_called_once_with( run_id=custom_run_id, + start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, state=State.RUNNING, conf=None, @@ -107,6 +110,7 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id, conf=conf) mock.assert_called_once_with( run_id=run_id, + start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, state=State.RUNNING, conf=json.loads(conf), From 44df3d24dd793e7abdf3a2d7afbc512ca52f0650 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 14 Sep 2021 12:08:20 +0100 Subject: [PATCH 03/11] Apply suggestions from code review --- tests/operators/test_trigger_dagrun.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 72720d1a52c7b..5d184dc30386e 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -285,4 +285,3 @@ def test_trigger_dagrun_triggering_itself_with_execution_date(self): with pytest.raises(DagRunAlreadyExists): task.run(start_date=execution_date, end_date=execution_date) - From d420b7a67a5f6d92ec7e71646e171e2beeb81223 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 14 Sep 2021 20:05:52 +0100 Subject: [PATCH 04/11] Apply suggestions from code review --- tests/operators/test_trigger_dagrun.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 5d184dc30386e..8581c94acbc41 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -284,4 +284,3 @@ def test_trigger_dagrun_triggering_itself_with_execution_date(self): ) with pytest.raises(DagRunAlreadyExists): task.run(start_date=execution_date, end_date=execution_date) - From 3932be798fce9b73fd5463deaf7fb6af324867c6 Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Wed, 15 Sep 2021 12:55:55 +0530 Subject: [PATCH 05/11] fix static code check --- tests/operators/test_trigger_dagrun.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 8581c94acbc41..c41f196d33fb7 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -273,7 +273,8 @@ def test_trigger_dagrun_triggering_itself(self): assert isinstance(dagruns[1].start_date, datetime) def test_trigger_dagrun_triggering_itself_with_execution_date(self): - """Test TriggerDagRunOperator that triggers itself with execution date, fails with DagRunAlreadyExists""" + """Test TriggerDagRunOperator that triggers itself with execution date, + fails with DagRunAlreadyExists""" execution_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", From 825c0d46d267e00bf398a10c8b2806d5b305aea1 Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Tue, 14 Sep 2021 12:33:27 +0530 Subject: [PATCH 06/11] Add start date to trigger_dagrun operator --- tests/operators/test_trigger_dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index c41f196d33fb7..489a96b31737f 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -284,4 +284,4 @@ def test_trigger_dagrun_triggering_itself_with_execution_date(self): dag=self.dag, ) with pytest.raises(DagRunAlreadyExists): - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=execution_date, end_date=execution_date) \ No newline at end of file From 434aeafbea104245728fc6976fdb2669d9d72433 Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Thu, 16 Sep 2021 10:46:02 +0530 Subject: [PATCH 07/11] fix static check --- tests/operators/test_trigger_dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 489a96b31737f..c41f196d33fb7 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -284,4 +284,4 @@ def test_trigger_dagrun_triggering_itself_with_execution_date(self): dag=self.dag, ) with pytest.raises(DagRunAlreadyExists): - task.run(start_date=execution_date, end_date=execution_date) \ No newline at end of file + task.run(start_date=execution_date, end_date=execution_date) From eb11898e1ee63eddcdfe097a3c8198a0ed8ca303 Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Thu, 16 Sep 2021 15:47:59 +0530 Subject: [PATCH 08/11] change state from to when triggering dag --- airflow/api/common/experimental/trigger_dag.py | 3 +-- tests/api/client/test_local_client.py | 12 ++++-------- tests/operators/test_trigger_dagrun.py | 3 ++- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index cd6c7defb193d..e7dbbf19d21e7 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -82,9 +82,8 @@ def _trigger_dag( for _dag in dags_to_trigger: trigger = _dag.create_dagrun( run_id=run_id, - start_date=timezone.utcnow(), execution_date=execution_date, - state=State.RUNNING, + state=State.QUEUED, conf=run_conf, external_trigger=True, dag_hash=dag_bag.dags_hash.get(dag_id), diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index 8a04c741a6fd5..a2af8ca245e6b 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -69,9 +69,8 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id) mock.assert_called_once_with( run_id=run_id, - start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, - state=State.RUNNING, + state=State.QUEUED, conf=None, external_trigger=True, dag_hash=ANY, @@ -82,9 +81,8 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id, execution_date=EXECDATE) mock.assert_called_once_with( run_id=run_id, - start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, - state=State.RUNNING, + state=State.QUEUED, conf=None, external_trigger=True, dag_hash=ANY, @@ -96,9 +94,8 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id, run_id=custom_run_id) mock.assert_called_once_with( run_id=custom_run_id, - start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, - state=State.RUNNING, + state=State.QUEUED, conf=None, external_trigger=True, dag_hash=ANY, @@ -110,9 +107,8 @@ def test_trigger_dag(self, mock): self.client.trigger_dag(dag_id=test_dag_id, conf=conf) mock.assert_called_once_with( run_id=run_id, - start_date=timezone.utcnow(), execution_date=EXECDATE_NOFRACTIONS, - state=State.RUNNING, + state=State.QUEUED, conf=json.loads(conf), external_trigger=True, dag_hash=ANY, diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index c41f196d33fb7..56d3e41794dcc 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -270,7 +270,8 @@ def test_trigger_dagrun_triggering_itself(self): with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == self.dag.dag_id).all() assert len(dagruns) == 2 - assert isinstance(dagruns[1].start_date, datetime) + dagruns[1].set_state() + # assert isinstance(dagruns[1].start_date, datetime) def test_trigger_dagrun_triggering_itself_with_execution_date(self): """Test TriggerDagRunOperator that triggers itself with execution date, From b2832626f7bd03dc74199abdf36cf861c08d26d2 Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Mon, 20 Sep 2021 16:01:21 +0530 Subject: [PATCH 09/11] add state check to test cases --- tests/operators/test_trigger_dagrun.py | 3 +-- tests/www/api/experimental/test_dag_runs_endpoint.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 56d3e41794dcc..5353dac24fe51 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -270,8 +270,7 @@ def test_trigger_dagrun_triggering_itself(self): with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == self.dag.dag_id).all() assert len(dagruns) == 2 - dagruns[1].set_state() - # assert isinstance(dagruns[1].start_date, datetime) + assert dagruns[1].state == State.QUEUED def test_trigger_dagrun_triggering_itself_with_execution_date(self): """Test TriggerDagRunOperator that triggers itself with execution date, diff --git a/tests/www/api/experimental/test_dag_runs_endpoint.py b/tests/www/api/experimental/test_dag_runs_endpoint.py index f738f63479f80..70f81da31495d 100644 --- a/tests/www/api/experimental/test_dag_runs_endpoint.py +++ b/tests/www/api/experimental/test_dag_runs_endpoint.py @@ -62,7 +62,7 @@ def test_get_dag_runs_success(self): assert data[0]['id'] == dag_run.id def test_get_dag_runs_success_with_state_parameter(self): - url_template = '/api/experimental/dags/{}/dag_runs?state=running' + url_template = '/api/experimental/dags/{}/dag_runs?state=queued' dag_id = 'example_bash_operator' # Create DagRun dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success') @@ -77,7 +77,7 @@ def test_get_dag_runs_success_with_state_parameter(self): assert data[0]['id'] == dag_run.id def test_get_dag_runs_success_with_capital_state_parameter(self): - url_template = '/api/experimental/dags/{}/dag_runs?state=RUNNING' + url_template = '/api/experimental/dags/{}/dag_runs?state=QUEUED' dag_id = 'example_bash_operator' # Create DagRun dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success') From c17bb0066d907575dba8c9efe7957fbbb1696bfc Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Tue, 21 Sep 2021 16:26:38 +0530 Subject: [PATCH 10/11] fix testcase failure, change allowed_states to QUEUED --- tests/operators/test_trigger_dagrun.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 5353dac24fe51..cb8749b9ef92b 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -232,7 +232,7 @@ def test_trigger_dagrun_with_wait_for_completion_true(self): execution_date=execution_date, wait_for_completion=True, poke_interval=10, - allowed_states=[State.RUNNING], + allowed_states=[State.QUEUED], dag=self.dag, ) task.run(start_date=execution_date, end_date=execution_date) @@ -250,7 +250,7 @@ def test_trigger_dagrun_with_wait_for_completion_true_fail(self): execution_date=execution_date, wait_for_completion=True, poke_interval=10, - failed_states=[State.RUNNING], + failed_states=[State.QUEUED], dag=self.dag, ) with pytest.raises(AirflowException): @@ -262,7 +262,6 @@ def test_trigger_dagrun_triggering_itself(self): task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=self.dag.dag_id, - allowed_states=[State.RUNNING, State.SUCCESS], dag=self.dag, ) task.run(start_date=execution_date, end_date=execution_date) @@ -280,7 +279,6 @@ def test_trigger_dagrun_triggering_itself_with_execution_date(self): task_id="test_task", trigger_dag_id=self.dag.dag_id, execution_date=execution_date, - allowed_states=[State.RUNNING, State.SUCCESS], dag=self.dag, ) with pytest.raises(DagRunAlreadyExists): From 45b9d4f92cfd24dac6a0ab01d397a7737fa9b48c Mon Sep 17 00:00:00 2001 From: Bhavani Ravi Date: Wed, 22 Sep 2021 11:58:29 +0530 Subject: [PATCH 11/11] fix test, order dagrun by execution date --- tests/operators/test_trigger_dagrun.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index cb8749b9ef92b..422f74853f6b1 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -267,7 +267,12 @@ def test_trigger_dagrun_triggering_itself(self): task.run(start_date=execution_date, end_date=execution_date) with create_session() as session: - dagruns = session.query(DagRun).filter(DagRun.dag_id == self.dag.dag_id).all() + dagruns = ( + session.query(DagRun) + .filter(DagRun.dag_id == self.dag.dag_id) + .order_by(DagRun.execution_date) + .all() + ) assert len(dagruns) == 2 assert dagruns[1].state == State.QUEUED