Skip to content

Commit

Permalink
Merge pull request #1 from ayush-san/master
Browse files Browse the repository at this point in the history
optmizing dag and task duration calculation
  • Loading branch information
ankitatreja-z authored Mar 23, 2020
2 parents 43862bf + 051c920 commit 25db5da
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions airflow_prometheus_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import json
import pickle
from contextlib import contextmanager
from datetime import timedelta

from airflow.configuration import conf
from airflow.models import DagModel, DagRun, TaskInstance, TaskFail, XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.settings import RBAC, Session
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.log.logging_mixin import LoggingMixin
from flask import Response
Expand All @@ -17,7 +19,7 @@

from airflow_prometheus_exporter.xcom_config import load_xcom_config

CANARY_DAG = "canary_dag"
AIRFLOW_LOG_CLEANUP = "airflow-log-cleanup"


@contextmanager
Expand Down Expand Up @@ -76,6 +78,7 @@ def get_dag_duration_info():
DagModel.is_paused == False,
DagRun.state == State.SUCCESS,
DagRun.end_date.isnot(None),
DagRun.end_date >= (timezone.utcnow() - timedelta(minutes=5))
)
.group_by(DagRun.dag_id)
.subquery()
Expand All @@ -99,6 +102,10 @@ def get_dag_duration_info():
),
),
)
.filter(
TaskInstance.start_date.isnot(None),
TaskInstance.end_date.isnot(None),
)
.group_by(
max_execution_dt_query.c.dag_id,
max_execution_dt_query.c.max_execution_dt,
Expand Down Expand Up @@ -245,6 +252,7 @@ def get_task_duration_info():
DagModel.is_paused == False,
DagRun.state == State.SUCCESS,
DagRun.end_date.isnot(None),
DagRun.end_date >= (timezone.utcnow() - timedelta(minutes=5))
)
.group_by(DagRun.dag_id)
.subquery()
Expand Down Expand Up @@ -289,7 +297,7 @@ def get_dag_scheduler_delay():
session.query(
DagRun.dag_id, DagRun.execution_date, DagRun.start_date,
)
.filter(DagRun.dag_id == CANARY_DAG,)
.filter(DagRun.dag_id == AIRFLOW_LOG_CLEANUP, )
.order_by(DagRun.execution_date.desc())
.limit(1)
.all()
Expand All @@ -305,7 +313,7 @@ def get_task_scheduler_delay():
func.max(TaskInstance.start_date).label("max_start"),
)
.filter(
TaskInstance.dag_id == CANARY_DAG,
TaskInstance.dag_id == AIRFLOW_LOG_CLEANUP,
TaskInstance.queued_dttm.isnot(None),
)
.group_by(TaskInstance.queue)
Expand All @@ -327,7 +335,7 @@ def get_task_scheduler_delay():
)
.filter(
TaskInstance.dag_id
== CANARY_DAG, # Redundant, for performance.
== AIRFLOW_LOG_CLEANUP, # Redundant, for performance.
)
.all()
)
Expand Down Expand Up @@ -479,7 +487,6 @@ def collect(self):
if RBAC:
from flask_appbuilder import BaseView as FABBaseView, expose as FABexpose


class RBACMetrics(FABBaseView):
route_base = "/admin/metrics/"

Expand Down

0 comments on commit 25db5da

Please sign in to comment.