Skip to content

Commit

Permalink
#62 Add DAG to automate generate daily and monthly tt
Browse files Browse the repository at this point in the history
  • Loading branch information
chmnata committed Sep 6, 2022
1 parent 9ef6528 commit 06a1b19
Showing 1 changed file with 127 additions and 0 deletions.
127 changes: 127 additions & 0 deletions dags/generate_congestion_agg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from airflow import DAG
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.sql import SQLCheckOperator

from psycopg2 import sql
from psycopg2.extras import execute_values
from psycopg2 import connect, Error
import logging

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

# Slack alert
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
task_msg = '<@UF4RQFQ11> !!! {task_id} in congestion_refresh DAG failed.'.format(task_id=context.get('task_instance').task_id)
slack_msg = task_msg + """(<{log_url}|log>)""".format(
log_url=context.get('task_instance').log_url,)
failed_alert = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow',
)
return failed_alert.execute(context=context)

default_args = {'owner':'natalie',
'depends_on_past':False,
'start_date': datetime(2022, 8, 10),
'email': ['natalie.chan@toronto.ca'],
'email_on_failure': False,
'email_on_success': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': task_fail_slack_alert
}

dag = DAG('congestion_refresh',
default_args=default_args,
schedule_interval='30 16 * * * ', # same as pull_here task
catchup=False,
)

## Functions
# check if its monday
def is_monday(date_to_pull):
execution_date = datetime.strptime(date_to_pull, "%Y-%m-%d")
if execution_date.weekday() == 0:
return True
else:
return False

# check if its the start of the month
def is_day_one(date_to_pull):
execution_date = datetime.strptime(date_to_pull, "%Y-%m-%d")
if execution_date.day == 1:
return True
else:
return False


## Tasks ##
## ExternalTaskSensor to wait for pull_here
wait_for_here = ExternalTaskSensor(task_id='wait_for_here',
external_dag_id='pull_here',
external_task_id='pull_here',
start_date=datetime(2020, 1, 5),
failed_states=['failed']
)

## ShortCircuitOperator Tasks, python_callable returns True or False; False means skip downstream tasks
check_dow = ShortCircuitOperator(
task_id='check_dow',
provide_context=False,
python_callable=is_monday,
op_kwargs={'date_to_pull': '{{ yesterday_ds }}'},
dag=dag
)

check_dom = ShortCircuitOperator(
task_id='check_dom',
provide_context=False,
python_callable=is_day_one,
op_kwargs={'date_to_pull': '{{ yesterday_ds }}'},
dag=dag
)

## SQLCheckOperator to check if all of last weeks data is in the data before aggregating
check_monthly = SQLCheckOperator(task_id = 'check_daily',
conn_id='congestion_bot',
sql = '''SELECT case when count(distinct dt) = extract('days' FROM ('{{ yesterday_ds }}'::date + interval '1 month' - '{{ yesterday_ds }}'::date))
then TRUE else FALSE end as counts
from here.ta''',
dag=dag)

## Postgres Tasks
# Task to aggregate segment level tt daily
aggregate_daily = PostgresOperator(sql='''SELECT congestion.generate_network_daily('{{ yesterday_ds }}') ''',
task_id='aggregate_daily',
postgres_conn_id='congestion_bot',
autocommit=True,
retries = 0,
dag=dag)


# Task to aggregate segment level tt monthly
aggregate_monthly = PostgresOperator(sql='''select congestion.generate_network_monthly('{{ ds.format(macros.ds_add(ds, -1), "%Y-%m-01") }}');''',
task_id='aggregate_monthly',
postgres_conn_id='congestion_bot',
autocommit=True,
retries = 0,
dag=dag)



wait_for_here >> aggregate_daily >> check_dow >> aggregate_daily
wait_for_here >> aggregate_daily >> check_dom >> check_monthly >> aggregate_monthly

0 comments on commit 06a1b19

Please sign in to comment.