Skip to content

Commit

Permalink
Add start date to trigger_dagrun operator (#18226)
Browse files Browse the repository at this point in the history
closes: #18082
  • Loading branch information
bhavaniravi authored Sep 24, 2021
1 parent 1d2924c commit 6609e9a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 9 deletions.
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _trigger_dag(
trigger = _dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
state=State.QUEUED,
conf=run_conf,
external_trigger=True,
dag_hash=dag_bag.dags_hash.get(dag_id),
Expand Down
8 changes: 4 additions & 4 deletions tests/api/client/test_local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=None,
external_trigger=True,
dag_hash=ANY,
Expand All @@ -82,7 +82,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=None,
external_trigger=True,
dag_hash=ANY,
Expand All @@ -95,7 +95,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=custom_run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=None,
external_trigger=True,
dag_hash=ANY,
Expand All @@ -108,7 +108,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=json.loads(conf),
external_trigger=True,
dag_hash=ANY,
Expand Down
37 changes: 35 additions & 2 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_trigger_dagrun_with_wait_for_completion_true(self):
execution_date=execution_date,
wait_for_completion=True,
poke_interval=10,
allowed_states=[State.RUNNING],
allowed_states=[State.QUEUED],
dag=self.dag,
)
task.run(start_date=execution_date, end_date=execution_date)
Expand All @@ -250,8 +250,41 @@ def test_trigger_dagrun_with_wait_for_completion_true_fail(self):
execution_date=execution_date,
wait_for_completion=True,
poke_interval=10,
failed_states=[State.RUNNING],
failed_states=[State.QUEUED],
dag=self.dag,
)
with pytest.raises(AirflowException):
task.run(start_date=execution_date, end_date=execution_date)

def test_trigger_dagrun_triggering_itself(self):
"""Test TriggerDagRunOperator that triggers itself"""
execution_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=self.dag.dag_id,
dag=self.dag,
)
task.run(start_date=execution_date, end_date=execution_date)

with create_session() as session:
dagruns = (
session.query(DagRun)
.filter(DagRun.dag_id == self.dag.dag_id)
.order_by(DagRun.execution_date)
.all()
)
assert len(dagruns) == 2
assert dagruns[1].state == State.QUEUED

def test_trigger_dagrun_triggering_itself_with_execution_date(self):
"""Test TriggerDagRunOperator that triggers itself with execution date,
fails with DagRunAlreadyExists"""
execution_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=self.dag.dag_id,
execution_date=execution_date,
dag=self.dag,
)
with pytest.raises(DagRunAlreadyExists):
task.run(start_date=execution_date, end_date=execution_date)
4 changes: 2 additions & 2 deletions tests/www/api/experimental/test_dag_runs_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_get_dag_runs_success(self):
assert data[0]['id'] == dag_run.id

def test_get_dag_runs_success_with_state_parameter(self):
url_template = '/api/experimental/dags/{}/dag_runs?state=running'
url_template = '/api/experimental/dags/{}/dag_runs?state=queued'
dag_id = 'example_bash_operator'
# Create DagRun
dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success')
Expand All @@ -77,7 +77,7 @@ def test_get_dag_runs_success_with_state_parameter(self):
assert data[0]['id'] == dag_run.id

def test_get_dag_runs_success_with_capital_state_parameter(self):
url_template = '/api/experimental/dags/{}/dag_runs?state=RUNNING'
url_template = '/api/experimental/dags/{}/dag_runs?state=QUEUED'
dag_id = 'example_bash_operator'
# Create DagRun
dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success')
Expand Down

0 comments on commit 6609e9a

Please sign in to comment.