From 078a151cace222499da14c4bad5d5eabf377aa11 Mon Sep 17 00:00:00 2001 From: Abhishek Ray Date: Tue, 10 Mar 2020 17:55:58 -0700 Subject: [PATCH 1/2] Don't include tasks with no start date --- airflow_prometheus_exporter/prometheus_exporter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow_prometheus_exporter/prometheus_exporter.py b/airflow_prometheus_exporter/prometheus_exporter.py index a0d24ee..232d458 100644 --- a/airflow_prometheus_exporter/prometheus_exporter.py +++ b/airflow_prometheus_exporter/prometheus_exporter.py @@ -120,6 +120,10 @@ def get_dag_duration_info(): == dag_start_dt_query.c.execution_date, ), ) + .filter( + TaskInstance.start_date.isnot(None), + TaskInstance.end_date.isnot(None), + ) .all() ) @@ -479,7 +483,6 @@ def collect(self): if RBAC: from flask_appbuilder import BaseView as FABBaseView, expose as FABexpose - class RBACMetrics(FABBaseView): route_base = "/admin/metrics/" From 051c920fe4db824f838274faa976116e806cac70 Mon Sep 17 00:00:00 2001 From: Ayush Chauhan Date: Fri, 20 Mar 2020 15:22:07 +0530 Subject: [PATCH 2/2] optmizing dag and task duration calculation --- .../prometheus_exporter.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/airflow_prometheus_exporter/prometheus_exporter.py b/airflow_prometheus_exporter/prometheus_exporter.py index 232d458..f7ac89f 100644 --- a/airflow_prometheus_exporter/prometheus_exporter.py +++ b/airflow_prometheus_exporter/prometheus_exporter.py @@ -2,11 +2,13 @@ import json import pickle from contextlib import contextmanager +from datetime import timedelta from airflow.configuration import conf from airflow.models import DagModel, DagRun, TaskInstance, TaskFail, XCom from airflow.plugins_manager import AirflowPlugin from airflow.settings import RBAC, Session +from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.log.logging_mixin import LoggingMixin from flask import Response @@ -17,7 +19,7 @@ from airflow_prometheus_exporter.xcom_config import load_xcom_config -CANARY_DAG = "canary_dag" +AIRFLOW_LOG_CLEANUP = "airflow-log-cleanup" @contextmanager @@ -76,6 +78,7 @@ def get_dag_duration_info(): DagModel.is_paused == False, DagRun.state == State.SUCCESS, DagRun.end_date.isnot(None), + DagRun.end_date >= (timezone.utcnow() - timedelta(minutes=5)) ) .group_by(DagRun.dag_id) .subquery() @@ -99,6 +102,10 @@ def get_dag_duration_info(): ), ), ) + .filter( + TaskInstance.start_date.isnot(None), + TaskInstance.end_date.isnot(None), + ) .group_by( max_execution_dt_query.c.dag_id, max_execution_dt_query.c.max_execution_dt, @@ -120,10 +127,6 @@ def get_dag_duration_info(): == dag_start_dt_query.c.execution_date, ), ) - .filter( - TaskInstance.start_date.isnot(None), - TaskInstance.end_date.isnot(None), - ) .all() ) @@ -249,6 +252,7 @@ def get_task_duration_info(): DagModel.is_paused == False, DagRun.state == State.SUCCESS, DagRun.end_date.isnot(None), + DagRun.end_date >= (timezone.utcnow() - timedelta(minutes=5)) ) .group_by(DagRun.dag_id) .subquery() @@ -293,7 +297,7 @@ def get_dag_scheduler_delay(): session.query( DagRun.dag_id, DagRun.execution_date, DagRun.start_date, ) - .filter(DagRun.dag_id == CANARY_DAG,) + .filter(DagRun.dag_id == AIRFLOW_LOG_CLEANUP, ) .order_by(DagRun.execution_date.desc()) .limit(1) .all() @@ -309,7 +313,7 @@ def get_task_scheduler_delay(): func.max(TaskInstance.start_date).label("max_start"), ) .filter( - TaskInstance.dag_id == CANARY_DAG, + TaskInstance.dag_id == AIRFLOW_LOG_CLEANUP, TaskInstance.queued_dttm.isnot(None), ) .group_by(TaskInstance.queue) @@ -331,7 +335,7 @@ def get_task_scheduler_delay(): ) .filter( TaskInstance.dag_id - == CANARY_DAG, # Redundant, for performance. + == AIRFLOW_LOG_CLEANUP, # Redundant, for performance. ) .all() )