From 2c92b02a3d89009fa43c9143ab5392d17919f859 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Sat, 5 Dec 2020 21:56:51 +0000 Subject: [PATCH] Don't emit first_task_scheduling_delay metric for only-once dags (#12835) Dags with a schedule interval of None, or `@once` don't have a following schedule, so we can't realistically calculate this metric. Additionally, this changes the emitted metric from seconds to milliseconds -- all timers to statsd should be in milliseconds -- this is what Statsd and apps that consume data from there expect. See #10629 for more details. This will be a "breaking" change from 1.10.14, where the metric was back-ported to, but was (incorrectly) emitting seconds. (cherry picked from commit 4a02e0a287f880eab98979de565c061747b35f27) --- airflow/models/dagrun.py | 22 +++++++----- docs/metrics.rst | 2 +- tests/models/test_dagrun.py | 67 +++++++++++++++++++++++++------------ 3 files changed, 61 insertions(+), 30 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 8ba7f4e6439d4..02e9c89bdfe7f 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -371,14 +371,20 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis): Note, the stat will only be emitted if the DagRun is a scheduler triggered one (i.e. external_trigger is False). """ + if self.state == State.RUNNING: + return + if self.external_trigger: + return + if not finished_tis: + return + try: - if self.state == State.RUNNING: - return - if self.external_trigger: - return - if not finished_tis: - return dag = self.get_dag() + + if not self.dag.schedule_interval or self.dag.schedule_interval == "@once": + # We can't emit this metric if there is no following schedule to cacluate from! + return + ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date] ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False) first_start_date = ordered_tis_by_start_date[0].start_date @@ -386,8 +392,8 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis): # dag.following_schedule calculates the expected start datetime for a scheduled dagrun # i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss, # and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison - true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds() - if true_delay >= 0: + true_delay = first_start_date - dag.following_schedule(self.execution_date) + if true_delay.total_seconds() > 0: Stats.timing('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay) except Exception as e: self.log.warning('Failed to record first_task_scheduling_delay metric:\n', e) diff --git a/docs/metrics.rst b/docs/metrics.rst index 82e62b0ffabc8..ee425250980bd 100644 --- a/docs/metrics.rst +++ b/docs/metrics.rst @@ -108,5 +108,5 @@ Name Description ``dagrun.duration.failed.`` Milliseconds taken for a DagRun to reach failed state ``dagrun.schedule_delay.`` Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date -``dagrun..first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start +``dagrun..first_task_scheduling_delay`` Milliseconds elapsed between first task start_date and dagrun expected start ================================================= ======================================================================= diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 1f627c58ef050..98980a1031570 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -34,6 +34,7 @@ from airflow.utils.trigger_rule import TriggerRule from tests.compat import mock, call from tests.models import DEFAULT_DATE +from tests.test_utils import db class DagRunTest(unittest.TestCase): @@ -628,31 +629,55 @@ def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock): self.assertNotIn(call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)), stats_mock.mock_calls) - @mock.patch.object(Stats, 'timing') - def test_emit_scheduling_delay(self, stats_mock): + @parameterized.expand( + [ + ("*/5 * * * *", True), + (None, False), + ("@once", False), + ] + ) + def test_emit_scheduling_delay(self, schedule_interval, expected): """ Tests that dag scheduling delay stat is set properly once running scheduled dag. dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method. """ - dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1)) + # Cleanup + db.clear_db_runs() + db.clear_db_dags() + + dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), schedule_interval=schedule_interval) dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow') session = settings.Session() - orm_dag = DagModel(dag_id=dag.dag_id, is_active=True) - session.add(orm_dag) - session.flush() - dag_run = dag.create_dagrun( - run_id="test", - state=State.SUCCESS, - execution_date=dag.start_date, - start_date=dag.start_date, - session=session, - ) - ti = dag_run.get_task_instance(dag_task.task_id) - ti.set_state(State.SUCCESS, session) - session.commit() - session.close() - dag_run.update_state() - true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds() - sched_delay_stat_call = call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay) - self.assertIn(sched_delay_stat_call, stats_mock.mock_calls) + try: + orm_dag = DagModel(dag_id=dag.dag_id, is_active=True) + session.add(orm_dag) + session.flush() + dag_run = dag.create_dagrun( + run_id="test", + state=State.SUCCESS, + execution_date=dag.start_date, + start_date=dag.start_date, + session=session, + ) + ti = dag_run.get_task_instance(dag_task.task_id, session) + ti.set_state(State.SUCCESS, session) + session.flush() + + with mock.patch.object(Stats, 'timing') as stats_mock: + dag_run.update_state(session) + + metric_name = 'dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id) + + if expected: + true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)) + sched_delay_stat_call = call(metric_name, true_delay) + assert sched_delay_stat_call in stats_mock.mock_calls + else: + # Assert that we never passed the metric + sched_delay_stat_call = call(metric_name, mock.ANY) + assert sched_delay_stat_call not in stats_mock.mock_calls + finally: + # Don't write anything to the DB + session.rollback() + session.close()