From 06a1b198117ec11efb705365b203ff5e14203109 Mon Sep 17 00:00:00 2001 From: chmnata Date: Tue, 6 Sep 2022 17:58:24 -0400 Subject: [PATCH] #62 Add DAG to automate generate daily and monthly tt --- dags/generate_congestion_agg.py | 127 ++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 dags/generate_congestion_agg.py diff --git a/dags/generate_congestion_agg.py b/dags/generate_congestion_agg.py new file mode 100644 index 0000000..0abf536 --- /dev/null +++ b/dags/generate_congestion_agg.py @@ -0,0 +1,127 @@ +from airflow import DAG +from datetime import datetime, timedelta +from dateutil.relativedelta import relativedelta +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator +from airflow.hooks.postgres_hook import PostgresHook +from airflow.hooks.base_hook import BaseHook +from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +from airflow.operators.postgres_operator import PostgresOperator +from airflow.operators.sensors import ExternalTaskSensor +from airflow.operators.sql import SQLCheckOperator + +from psycopg2 import sql +from psycopg2.extras import execute_values +from psycopg2 import connect, Error +import logging + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +# Slack alert +SLACK_CONN_ID = 'slack' +def task_fail_slack_alert(context): + slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password + task_msg = '<@UF4RQFQ11> !!! {task_id} in congestion_refresh DAG failed.'.format(task_id=context.get('task_instance').task_id) + slack_msg = task_msg + """(<{log_url}|log>)""".format( + log_url=context.get('task_instance').log_url,) + failed_alert = SlackWebhookOperator( + task_id='slack_test', + http_conn_id='slack', + webhook_token=slack_webhook_token, + message=slack_msg, + username='airflow', + ) + return failed_alert.execute(context=context) + +default_args = {'owner':'natalie', + 'depends_on_past':False, + 'start_date': datetime(2022, 8, 10), + 'email': ['natalie.chan@toronto.ca'], + 'email_on_failure': False, + 'email_on_success': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=5), + 'on_failure_callback': task_fail_slack_alert + } + +dag = DAG('congestion_refresh', + default_args=default_args, + schedule_interval='30 16 * * * ', # same as pull_here task + catchup=False, +) + +## Functions +# check if its monday +def is_monday(date_to_pull): + execution_date = datetime.strptime(date_to_pull, "%Y-%m-%d") + if execution_date.weekday() == 0: + return True + else: + return False + +# check if its the start of the month +def is_day_one(date_to_pull): + execution_date = datetime.strptime(date_to_pull, "%Y-%m-%d") + if execution_date.day == 1: + return True + else: + return False + + +## Tasks ## +## ExternalTaskSensor to wait for pull_here +wait_for_here = ExternalTaskSensor(task_id='wait_for_here', + external_dag_id='pull_here', + external_task_id='pull_here', + start_date=datetime(2020, 1, 5), + failed_states=['failed'] + ) + +## ShortCircuitOperator Tasks, python_callable returns True or False; False means skip downstream tasks +check_dow = ShortCircuitOperator( + task_id='check_dow', + provide_context=False, + python_callable=is_monday, + op_kwargs={'date_to_pull': '{{ yesterday_ds }}'}, + dag=dag + ) + +check_dom = ShortCircuitOperator( + task_id='check_dom', + provide_context=False, + python_callable=is_day_one, + op_kwargs={'date_to_pull': '{{ yesterday_ds }}'}, + dag=dag + ) + +## SQLCheckOperator to check if all of last weeks data is in the data before aggregating +check_monthly = SQLCheckOperator(task_id = 'check_daily', + conn_id='congestion_bot', + sql = '''SELECT case when count(distinct dt) = extract('days' FROM ('{{ yesterday_ds }}'::date + interval '1 month' - '{{ yesterday_ds }}'::date)) + then TRUE else FALSE end as counts + from here.ta''', + dag=dag) + +## Postgres Tasks +# Task to aggregate segment level tt daily +aggregate_daily = PostgresOperator(sql='''SELECT congestion.generate_network_daily('{{ yesterday_ds }}') ''', + task_id='aggregate_daily', + postgres_conn_id='congestion_bot', + autocommit=True, + retries = 0, + dag=dag) + + +# Task to aggregate segment level tt monthly +aggregate_monthly = PostgresOperator(sql='''select congestion.generate_network_monthly('{{ ds.format(macros.ds_add(ds, -1), "%Y-%m-01") }}');''', + task_id='aggregate_monthly', + postgres_conn_id='congestion_bot', + autocommit=True, + retries = 0, + dag=dag) + + + +wait_for_here >> aggregate_daily >> check_dow >> aggregate_daily +wait_for_here >> aggregate_daily >> check_dom >> check_monthly >> aggregate_monthly \ No newline at end of file