Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start date to trigger_dagrun operator #18226

Merged
merged 11 commits into from
Sep 24, 2021
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _trigger_dag(
trigger = _dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
state=State.QUEUED,
conf=run_conf,
external_trigger=True,
dag_hash=dag_bag.dags_hash.get(dag_id),
Expand Down
8 changes: 4 additions & 4 deletions tests/api/client/test_local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=None,
external_trigger=True,
dag_hash=ANY,
Expand All @@ -82,7 +82,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=None,
external_trigger=True,
dag_hash=ANY,
Expand All @@ -95,7 +95,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=custom_run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=None,
external_trigger=True,
dag_hash=ANY,
Expand All @@ -108,7 +108,7 @@ def test_trigger_dag(self, mock):
mock.assert_called_once_with(
run_id=run_id,
execution_date=EXECDATE_NOFRACTIONS,
state=State.RUNNING,
state=State.QUEUED,
conf=json.loads(conf),
external_trigger=True,
dag_hash=ANY,
Expand Down
37 changes: 35 additions & 2 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_trigger_dagrun_with_wait_for_completion_true(self):
execution_date=execution_date,
wait_for_completion=True,
poke_interval=10,
allowed_states=[State.RUNNING],
allowed_states=[State.QUEUED],
dag=self.dag,
)
task.run(start_date=execution_date, end_date=execution_date)
Expand All @@ -250,8 +250,41 @@ def test_trigger_dagrun_with_wait_for_completion_true_fail(self):
execution_date=execution_date,
wait_for_completion=True,
poke_interval=10,
failed_states=[State.RUNNING],
failed_states=[State.QUEUED],
dag=self.dag,
)
with pytest.raises(AirflowException):
task.run(start_date=execution_date, end_date=execution_date)

def test_trigger_dagrun_triggering_itself(self):
"""Test TriggerDagRunOperator that triggers itself"""
execution_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=self.dag.dag_id,
dag=self.dag,
)
task.run(start_date=execution_date, end_date=execution_date)

with create_session() as session:
dagruns = (
session.query(DagRun)
.filter(DagRun.dag_id == self.dag.dag_id)
.order_by(DagRun.execution_date)
.all()
)
assert len(dagruns) == 2
assert dagruns[1].state == State.QUEUED

def test_trigger_dagrun_triggering_itself_with_execution_date(self):
"""Test TriggerDagRunOperator that triggers itself with execution date,
fails with DagRunAlreadyExists"""
execution_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=self.dag.dag_id,
execution_date=execution_date,
dag=self.dag,
)
with pytest.raises(DagRunAlreadyExists):
task.run(start_date=execution_date, end_date=execution_date)
4 changes: 2 additions & 2 deletions tests/www/api/experimental/test_dag_runs_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_get_dag_runs_success(self):
assert data[0]['id'] == dag_run.id

def test_get_dag_runs_success_with_state_parameter(self):
url_template = '/api/experimental/dags/{}/dag_runs?state=running'
url_template = '/api/experimental/dags/{}/dag_runs?state=queued'
dag_id = 'example_bash_operator'
# Create DagRun
dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success')
Expand All @@ -77,7 +77,7 @@ def test_get_dag_runs_success_with_state_parameter(self):
assert data[0]['id'] == dag_run.id

def test_get_dag_runs_success_with_capital_state_parameter(self):
url_template = '/api/experimental/dags/{}/dag_runs?state=RUNNING'
url_template = '/api/experimental/dags/{}/dag_runs?state=QUEUED'
dag_id = 'example_bash_operator'
# Create DagRun
dag_run = trigger_dag(dag_id=dag_id, run_id='test_get_dag_runs_success')
Expand Down