From 8dfff79a520ece0dcaa05b2564b82b8b8279f734 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Thu, 23 Mar 2023 18:46:19 -0300 Subject: [PATCH 01/22] fix(sinan): refactor sinan dag --- containers/airflow/dags/brasil/sinan.py | 142 ++++++++++++++++++++---- 1 file changed, 120 insertions(+), 22 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 06478a44..757a97fc 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -42,24 +42,27 @@ import pandas as pd import logging as logger +from ftplib import FTP +from operator import is_not +from itertools import chain +from functools import partial from datetime import timedelta from pysus.online_data import SINAN from airflow import DAG from airflow.decorators import task, dag +from airflow.models import DagRun, TaskInstance from epigraphhub.settings import env from epigraphhub.connection import get_engine from epigraphhub.data.brasil.sinan import ( FTP_SINAN, extract, - loading, + viz, DISEASES, normalize_str, ) -ENG = get_engine(credential_name=env.db.default_credential) -SCHEMA = "brasil" DEFAULT_ARGS = { "owner": "epigraphhub", "depends_on_past": False, @@ -78,31 +81,126 @@ def task_flow_for(disease: str): in. SINAN DAGs will have the same workflow. """ + schema = 'brasil' tablename = "sinan_" + normalize_str(disease) + "_m" + engine = get_engine(credential_name=env.db.default_credential) - def _count_table_rows() -> dict: - """ - Counts table rows from brasil's Schema - """ - with ENG.connect() as conn: - try: - cur = conn.execute(f"SELECT COUNT(*) FROM {SCHEMA}.{tablename}") - rowcount = cur.fetchone()[0] - except Exception as e: - if "UndefinedTable" in str(e): - return dict(rows=0) - else: - raise e - return dict(rows=rowcount) + prelim_years = list(map(int, FTP_SINAN(disease).get_years('prelim'))) + finals_years = list(map(int, FTP_SINAN(disease).get_years('finais'))) @task(task_id="start") - def start() -> int: + def start_task(): """ - Task to start the workflow, will read the database and return - the rows count for a SINAN disease. + Task to start the workflow, extracts all the last update date + for the each DBC file in FTP server. SINAN DAG will use the + previous start task run to decide rather the dbc should be + inserted into DB or not. """ - logger.info(f"ETL started for {disease}") - return _count_table_rows() + with engine.connect() as conn: + conn.execute( + f'CREATE TABLE IF NOT EXISTS {schema}.sinan_update_ctl (' + ' disease TEXT NOT NULL,' + ' year INT NOT NULL,' + ' prelim BOOL NOT NULL,' + ' last_insert DATE' + ')' + ) + + @task(task_id="get_updates") + def dbcs_to_fetch() -> dict: + all_years = prelim_years + finals_years + + db_years = [] + with engine.connect() as conn: + cur = conn.execute( + f'SELECT year FROM {schema}.sinan_update_ctl' + f' WHERE disease = {disease}' + ) + db_years.extend(list(chain(*cur.all()))) + not_inserted = [y for y in all_years if y not in db_years] + + db_prelimns = [] + with engine.connect() as conn: + cur = conn.execute( + f'SELECT year FROM {schema}.sinan_update_ctl' + f' WHERE disease = {disease} AND prelim IS True' + ) + db_years.extend(list(chain(*cur.all()))) + prelim_to_final = [y for y in finals_years if y in db_prelimns] + prelim_to_update = [y for y in prelim_years if y in db_prelimns] + + return dict( + to_insert = not_inserted, + to_finals = prelim_to_final, + to_update = prelim_to_update + ) + + @task(task_id="extract") + def extract_parquets(**kwargs) -> dict: + ti = kwargs["ti"] + years = ti.xcom_pull(task_ids="get_updates") + + extract_pqs = lambda stage: extract.download( + disease=disease, years=years[stage] + ) if any(years[stage]) else [] + + return dict( + pqs_to_insert = extract_pqs('to_insert'), + pqs_to_finals = extract_pqs('to_finals'), + pqs_to_update = extract_pqs('to_update') + ) + + @task(task_id='first_insertion') + def upload_not_inserted(**kwargs): + ti = kwargs["ti"] + parquets = ti.xcom_pull(task_ids="extract") + get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) + + finals, prelims = ([], []) + for parquet in parquets: + ( + finals.append(parquet) + if get_year(parquet) in finals_years + else prelims.append(get_year(parquet)) + ) + + upload = lambda df: df.to_sql( + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append' + ) + + for final_pq in finals: + df = viz(final_pq) + df['year'] = (get_year(final_pq)) + df['prelim'] = (False) + upload(df) + logger.info(f'{final_pq} inserted into db') + with engine.connect() as conn: + conn.execute( + f'INSERT INTO {schema}.sinan_update_ctl(' + 'disease, year, prelim, last_insert) VALUES (' + f'{disease},{get_year(final_pq)},False,{ti.execution_date})' + ) + + for prelim_pq in prelims: + df = viz(prelim_pq) + df['year'] = (get_year(prelim_pq)) + df['prelim'] = (True) + upload(df) + logger.info(f'{prelim_pq} inserted into db') + with engine.connect() as conn: + conn.execute( + f'INSERT INTO {schema}.sinan_update_ctl(' + 'disease, year, prelim, last_insert) VALUES (' + f'{disease},{get_year(prelim_pq)},True,{ti.execution_date})' + ) + + + + + @task(task_id="extract", retries=3) def download(disease: str) -> list: From adacfe0a500dc162646e2c8e4f0eb339756040e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 09:45:22 -0300 Subject: [PATCH 02/22] Insertion methods --- containers/airflow/dags/brasil/sinan.py | 224 +++++++++++++++--------- 1 file changed, 140 insertions(+), 84 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 757a97fc..9453a89d 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -42,16 +42,14 @@ import pandas as pd import logging as logger -from ftplib import FTP -from operator import is_not from itertools import chain -from functools import partial from datetime import timedelta from pysus.online_data import SINAN from airflow import DAG from airflow.decorators import task, dag -from airflow.models import DagRun, TaskInstance +from airflow.operators.empty import EmptyOperator +from airflow.exceptions import AirflowSkipException from epigraphhub.settings import env from epigraphhub.connection import get_engine @@ -88,6 +86,15 @@ def task_flow_for(disease: str): prelim_years = list(map(int, FTP_SINAN(disease).get_years('prelim'))) finals_years = list(map(int, FTP_SINAN(disease).get_years('finais'))) + get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) + + upload_df = lambda df: df.to_sql( + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append' + ) + @task(task_id="start") def start_task(): """ @@ -142,7 +149,7 @@ def extract_parquets(**kwargs) -> dict: extract_pqs = lambda stage: extract.download( disease=disease, years=years[stage] - ) if any(years[stage]) else [] + ) if any(years[stage]) else () return dict( pqs_to_insert = extract_pqs('to_insert'), @@ -151,10 +158,14 @@ def extract_parquets(**kwargs) -> dict: ) @task(task_id='first_insertion') - def upload_not_inserted(**kwargs): + def upload_not_inserted(**kwargs) -> dict: ti = kwargs["ti"] - parquets = ti.xcom_pull(task_ids="extract") - get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) + parquets = ti.xcom_pull(task_ids="extract")['pqs_to_insert'] + inserted_rows = dict() + + if not parquets: + logger.info('There is no new DBCs to insert on DB') + raise AirflowSkipException() finals, prelims = ([], []) for parquet in parquets: @@ -163,91 +174,133 @@ def upload_not_inserted(**kwargs): if get_year(parquet) in finals_years else prelims.append(get_year(parquet)) ) - - upload = lambda df: df.to_sql( - name=tablename, - con=engine.connect(), - schema=schema, - if_exists='append' - ) - - for final_pq in finals: - df = viz(final_pq) - df['year'] = (get_year(final_pq)) + + for final_pq in (finals or []): + year = get_year(final_pq) + df = viz.parquet(final_pq) + if df.empty: + raise ValueError('DataFrame is empty') + df['year'] = (year) df['prelim'] = (False) - upload(df) + upload_df(df) logger.info(f'{final_pq} inserted into db') with engine.connect() as conn: conn.execute( f'INSERT INTO {schema}.sinan_update_ctl(' 'disease, year, prelim, last_insert) VALUES (' - f'{disease},{get_year(final_pq)},False,{ti.execution_date})' + f'{disease}, {year}, False, {ti.execution_date})' ) - - for prelim_pq in prelims: - df = viz(prelim_pq) - df['year'] = (get_year(prelim_pq)) + cur = conn.execute( + f'SELECT COUNT(*) FROM {schema}.{tablename}' + f' WHERE year = {year}' + ) + inserted_rows[year] = cur.fetchone[0] + + for prelim_pq in prelims or []: + year = get_year(prelim_pq) + df = viz.parquet(prelim_pq) + if df.empty: + raise ValueError('DataFrame is empty') + df['year'] = (year) df['prelim'] = (True) - upload(df) + upload_df(df) logger.info(f'{prelim_pq} inserted into db') with engine.connect() as conn: conn.execute( f'INSERT INTO {schema}.sinan_update_ctl(' 'disease, year, prelim, last_insert) VALUES (' - f'{disease},{get_year(prelim_pq)},True,{ti.execution_date})' + f'{disease}, {year}, True, {ti.execution_date})' ) + cur = conn.execute( + f'SELECT COUNT(*) FROM {schema}.{tablename}' + f' WHERE year = {year}' + ) + inserted_rows[year] = cur.fetchone[0] + + return inserted_rows + @task(task_id='prelims_to_finals') + def update_prelim_to_final(**kwargs): + ti = kwargs["ti"] + parquets = ti.xcom_pull(task_ids="extract")['pqs_to_finals'] + if not parquets: + logger.info( + 'Not found any prelim DBC that have been passed to finals' + ) + raise AirflowSkipException() + + for parquet in parquets: + year = get_year(parquet) + df = viz.parquet(parquet) + if df.empty: + raise ValueError('DataFrame is empty') + df['year'] = (year) + df['prelim'] = (False) + with engine.connect() as conn: + conn.execute( + f'DELETE FROM {schema}.{tablename}' + f' WHERE year = {year}' + f' AND prelim = True' + ) + + upload_df(df) + logger.info( + f'{parquet} data updated from prelim to final.' + ) + with engine.connect() as conn: + conn.execute( + f'UPDATE {schema}.sinan_update_ctl' + f' SET prelim = False, last_insert = {ti.execution_date}' + f' WHERE disease = {disease} AND year = {year}' + ) - - @task(task_id="extract", retries=3) - def download(disease: str) -> list: - """ - This task is responsible for downloading every year found for - a disease. It will download at `/tmp/pysus/` and return a list - with downloaded parquet paths. - """ - years = FTP_SINAN(disease).get_years() - parquet_dirs = extract.download(disease=disease, years=years) - logger.info(f"Data for {disease} extracted") - return parquet_dirs - - @task(task_id="upload") - def upload(disease: str, **kwargs) -> None: - """ - This task is responsible for uploading each parquet dir into - postgres database. It receives the disease name and the xcom - from `download` task to insert. - """ + @task(task_id='update_prelims') + def update_prelim_parquets(**kwargs): ti = kwargs["ti"] - parquets_dirs = ti.xcom_pull(task_ids="extract") - for dir in parquets_dirs: - try: - loading.upload(disease=disease, parquet_dir=dir) - logger.info(f"{dir} inserted into db") - except Exception as e: - logger.error(e) - raise e - - @task(task_id="diagnosis") - def compare_tables_rows(**kwargs) -> int: - """ - This task will be responsible for checking how many rows were - inserted into a disease table. It will compare with the start - task and store the difference as a xcom. - """ - ti = kwargs["ti"] - ini_rows_amt = ti.xcom_pull(task_ids="start") - end_rows_amt = _count_table_rows() + parquets = ti.xcom_pull(task_ids="extract")['pqs_to_update'] - new_rows = end_rows_amt["rows"] - ini_rows_amt["rows"] + if not parquets: + logger.info('No preliminary parquet found to update') + raise AirflowSkipException() - logger.info(f"{new_rows} new rows inserted into brasil.{tablename}") + for parquet in parquets: + year = get_year(parquet) + df = viz.parquet(parquet) + if df.empty: + raise ValueError('DataFrame is empty') + df['year'] = (year) + df['prelim'] = (True) - ti.xcom_push(key="rows", value=ini_rows_amt["rows"]) - ti.xcom_push(key="new_rows", value=new_rows) + with engine.connect() as conn: + cur = conn.execute( + f'SELECT COUNT(*) FROM {schema}.{tablename}' + f' WHERE year = {year}' + ) + conn.execute( + f'DELETE FROM {schema}.{tablename}' + f' WHERE year = {year}' + f' AND prelim = True' + ) + old_rows = cur.fetchone[0] + + upload_df(df) + logger.info( + f'{parquet} data updated' + '\n~~~~~ ' + f'\nRows inserted: {len(df)}' + f'\nNew rows: {len(df) - int(old_rows)}' + '\n~~~~~ ' + ) + + with engine.connect() as conn: + conn.execute( + f'UPDATE {schema}.sinan_update_ctl' + f' SET last_insert = {ti.execution_date}' + f' WHERE disease = {disease} AND year = {year}' + ) @task(trigger_rule="all_done") def remove_parquets(**kwargs) -> None: @@ -257,30 +310,33 @@ def remove_parquets(**kwargs) -> None: task receives and delete all them. """ ti = kwargs["ti"] - parquet_dirs = ti.xcom_pull(task_ids="extract") + pqts = ti.xcom_pull(task_ids="extract") + + parquet_dirs = list( + chain(*(pqts['to_insert'], pqts['to_finals'], pqts['to_update'])) + ) for dir in parquet_dirs: shutil.rmtree(dir, ignore_errors=True) logger.warning(f"{dir} removed") - @task(trigger_rule="none_failed") - def done(**kwargs) -> None: - """This task will fail if any upstream task fails.""" - ti = kwargs["ti"] - print(ti.xcom_pull(key="state", task_ids="upload")) - if ti.xcom_pull(key="state", task_ids="upload") == "FAILED": - raise ValueError("Force failure because upstream task has failed") + + end = EmptyOperator( + task_id="done", + trigger_rule="all_success", + ) # Defining the tasks - ini = start() - E = download(disease) - L = upload(disease) - diagnosis = compare_tables_rows() + ini = start_task() + dbcs = dbcs_to_fetch() + E = extract_parquets() + upload_new = upload_not_inserted() + to_final = update_prelim_to_final() + prelims = update_prelim_parquets() clean = remove_parquets() - end = done() # Task flow - ini >> E >> L >> diagnosis >> clean >> end + ini >> dbcs >> E >> upload_new >> to_final >> prelims >> clean >> end def create_dag( From 68db71b645a273420c99bdd81dbe454da233f6e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 09:47:27 -0300 Subject: [PATCH 03/22] Fix metadata dag --- containers/airflow/dags/brasil/sinan.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 9453a89d..78a9cc23 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -385,6 +385,8 @@ def metadata_tables(): available, only skip it. """ + engine = get_engine(credential_name=env.db.default_credential) + @task(task_id="insert_metadata_tables") def metadata_tables(): for disease in DISEASES: @@ -393,8 +395,8 @@ def metadata_tables(): pd.DataFrame.to_sql( metadata_df, f"sinan_{normalize_str(disease)}_metadata", - con=ENG, - schema=SCHEMA, + con=engine, + schema='brasil', if_exists="replace", ) From 073b95b3f889e229877071bb65fcf4cce32c2ade Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 11:19:07 -0300 Subject: [PATCH 04/22] Fixing DagBad timeout error --- containers/airflow/airflow.cfg | 4 +- containers/airflow/dags/brasil/sinan.py | 77 +++++-------------- .../airflow/dags/brasil/sinan_metadata.py | 65 ++++++++++++++++ 3 files changed, 87 insertions(+), 59 deletions(-) create mode 100644 containers/airflow/dags/brasil/sinan_metadata.py diff --git a/containers/airflow/airflow.cfg b/containers/airflow/airflow.cfg index b0425a50..efff4813 100644 --- a/containers/airflow/airflow.cfg +++ b/containers/airflow/airflow.cfg @@ -96,7 +96,7 @@ execute_tasks_new_python_interpreter = False donot_pickle = True # How long before timing out a python file import -dagbag_import_timeout = 30.0 +dagbag_import_timeout = 300.0 # Should a traceback be shown in the UI for dagbag import errors, # instead of just the exception message @@ -881,7 +881,7 @@ pool = prefork # The number of seconds to wait before timing out ``send_task_to_executor`` or # ``fetch_celery_task_state`` operations. -operation_timeout = 1.0 +operation_timeout = 5.0 # Celery task will report its status as 'started' when the task is executed by a worker. # This is used in Airflow to keep track of the running tasks and if a Scheduler is restarted diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 78a9cc23..97eda5c3 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -37,29 +37,14 @@ This task will fail if any task above fails, breaking the DAG. """ -import shutil import pendulum -import pandas as pd import logging as logger - -from itertools import chain from datetime import timedelta -from pysus.online_data import SINAN -from airflow import DAG -from airflow.decorators import task, dag +from airflow.decorators import task from airflow.operators.empty import EmptyOperator -from airflow.exceptions import AirflowSkipException from epigraphhub.settings import env -from epigraphhub.connection import get_engine -from epigraphhub.data.brasil.sinan import ( - FTP_SINAN, - extract, - viz, - DISEASES, - normalize_str, -) DEFAULT_ARGS = { "owner": "epigraphhub", @@ -78,6 +63,10 @@ def task_flow_for(disease: str): creating the task dependencies for the SINAN disease that is passed in. SINAN DAGs will have the same workflow. """ + from itertools import chain + from epigraphhub.connection import get_engine + from airflow.exceptions import AirflowSkipException + from epigraphhub.data.brasil.sinan import FTP_SINAN, normalize_str schema = 'brasil' tablename = "sinan_" + normalize_str(disease) + "_m" @@ -87,7 +76,7 @@ def task_flow_for(disease: str): finals_years = list(map(int, FTP_SINAN(disease).get_years('finais'))) get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) - + upload_df = lambda df: df.to_sql( name=tablename, con=engine.connect(), @@ -144,6 +133,8 @@ def dbcs_to_fetch() -> dict: @task(task_id="extract") def extract_parquets(**kwargs) -> dict: + from epigraphhub.data.brasil.sinan import extract + ti = kwargs["ti"] years = ti.xcom_pull(task_ids="get_updates") @@ -159,6 +150,8 @@ def extract_parquets(**kwargs) -> dict: @task(task_id='first_insertion') def upload_not_inserted(**kwargs) -> dict: + from epigraphhub.data.brasil.sinan import viz + ti = kwargs["ti"] parquets = ti.xcom_pull(task_ids="extract")['pqs_to_insert'] inserted_rows = dict() @@ -221,6 +214,8 @@ def upload_not_inserted(**kwargs) -> dict: @task(task_id='prelims_to_finals') def update_prelim_to_final(**kwargs): + from epigraphhub.data.brasil.sinan import viz + ti = kwargs["ti"] parquets = ti.xcom_pull(task_ids="extract")['pqs_to_finals'] @@ -259,6 +254,8 @@ def update_prelim_to_final(**kwargs): @task(task_id='update_prelims') def update_prelim_parquets(**kwargs): + from epigraphhub.data.brasil.sinan import viz + ti = kwargs["ti"] parquets = ti.xcom_pull(task_ids="extract")['pqs_to_update'] @@ -304,6 +301,7 @@ def update_prelim_parquets(**kwargs): @task(trigger_rule="all_done") def remove_parquets(**kwargs) -> None: + import shutil """ This task will be responsible for deleting all parquet files downloaded. It will receive the same parquet dirs the `upload` @@ -339,6 +337,8 @@ def remove_parquets(**kwargs) -> None: ini >> dbcs >> E >> upload_new >> to_final >> prelims >> clean >> end +from epigraphhub.data.brasil.sinan import DISEASES + def create_dag( disease: str, schedule: str, @@ -349,6 +349,8 @@ def create_dag( SINAN disease. It will receive the disease, its schedule and the start date, returning a DAG with the task flow. """ + from airflow import DAG + sinan_tag = ["SINAN", "Brasil"] sinan_tag.append(disease) DEFAULT_ARGS.update(start_date=start) @@ -360,6 +362,7 @@ def create_dag( start_date=start, catchup=False, schedule_interval=schedule, + dagrun_timeout=timedelta(minutes=10) ) with dag: @@ -369,46 +372,6 @@ def create_dag( # DAGs -@dag( - "SINAN_METADATA", - default_args=DEFAULT_ARGS, - tags=["SINAN", "Brasil", "Metadata"], - start_date=pendulum.datetime(2022, 2, 1), - catchup=False, - schedule_interval="@once", -) -def metadata_tables(): - """ - This DAG will run only once and create a table in the - database with each metadata found in the pysus SINAN - disease metadata. It will not fail if a metadata is not - available, only skip it. - """ - - engine = get_engine(credential_name=env.db.default_credential) - - @task(task_id="insert_metadata_tables") - def metadata_tables(): - for disease in DISEASES: - try: - metadata_df = SINAN.metadata_df(disease) - pd.DataFrame.to_sql( - metadata_df, - f"sinan_{normalize_str(disease)}_metadata", - con=engine, - schema='brasil', - if_exists="replace", - ) - - logger.info(f"Metadata table for {disease} updated.") - except Exception: - print(f"No metadata available for {disease}") - - meta = metadata_tables() - meta - - -dag = metadata_tables() # Here its where the DAGs are created, an specific case can be specified for disease in DISEASES: # Change DAG variables here diff --git a/containers/airflow/dags/brasil/sinan_metadata.py b/containers/airflow/dags/brasil/sinan_metadata.py new file mode 100644 index 00000000..e771668a --- /dev/null +++ b/containers/airflow/dags/brasil/sinan_metadata.py @@ -0,0 +1,65 @@ +import pendulum +import pandas as pd +import logging as logger + +from datetime import timedelta +from pysus.online_data import SINAN +from airflow.decorators import task, dag + +from epigraphhub.settings import env +from epigraphhub.connection import get_engine +from epigraphhub.data.brasil.sinan import ( + DISEASES, + normalize_str, +) + +DEFAULT_ARGS = { + "owner": "epigraphhub", + "depends_on_past": False, + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=2), +} + + +@dag( + "SINAN_METADATA", + default_args=DEFAULT_ARGS, + tags=["SINAN", "Brasil", "Metadata"], + start_date=pendulum.datetime(2022, 2, 1), + catchup=False, + schedule_interval="@once", +) +def metadata_tables(): + """ + This DAG will run only once and create a table in the + database with each metadata found in the pysus SINAN + disease metadata. It will not fail if a metadata is not + available, only skip it. + """ + + engine = get_engine(credential_name=env.db.default_credential) + + @task(task_id="insert_metadata_tables") + def metadata_tables(): + for disease in DISEASES: + try: + metadata_df = SINAN.metadata_df(disease) + pd.DataFrame.to_sql( + metadata_df, + f"sinan_{normalize_str(disease)}_metadata", + con=engine, + schema='brasil', + if_exists="replace", + ) + + logger.info(f"Metadata table for {disease} updated.") + except Exception: + print(f"No metadata available for {disease}") + + meta = metadata_tables() + meta + +dag = metadata_tables() From aa9391d224478a286aeafca99a035cfd363668b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 11:58:12 -0300 Subject: [PATCH 05/22] Minor fixes --- containers/airflow/dags/brasil/sinan.py | 34 ++++++++++++++++--------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 97eda5c3..e478e60f 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -40,6 +40,7 @@ import pendulum import logging as logger from datetime import timedelta +from psycopg2.errors import UndefinedColumn from airflow.decorators import task from airflow.operators.empty import EmptyOperator @@ -108,20 +109,29 @@ def dbcs_to_fetch() -> dict: db_years = [] with engine.connect() as conn: - cur = conn.execute( - f'SELECT year FROM {schema}.sinan_update_ctl' - f' WHERE disease = {disease}' - ) - db_years.extend(list(chain(*cur.all()))) + try: + cur = conn.execute( + f'SELECT year FROM {schema}.sinan_update_ctl' + f' WHERE disease = "{disease}"' + ) + years = cur.all() + except UndefinedColumn: + years = [] + db_years.extend(list(chain(*years))) not_inserted = [y for y in all_years if y not in db_years] db_prelimns = [] with engine.connect() as conn: - cur = conn.execute( - f'SELECT year FROM {schema}.sinan_update_ctl' - f' WHERE disease = {disease} AND prelim IS True' - ) - db_years.extend(list(chain(*cur.all()))) + try: + cur = conn.execute( + f'SELECT year FROM {schema}.sinan_update_ctl' + f' WHERE disease = "{disease}" AND prelim IS True' + ) + years = cur.all() + except UndefinedColumn: + years = [] + db_years.extend(list(chain(*years))) + prelim_to_final = [y for y in finals_years if y in db_prelimns] prelim_to_update = [y for y in prelim_years if y in db_prelimns] @@ -249,7 +259,7 @@ def update_prelim_to_final(**kwargs): conn.execute( f'UPDATE {schema}.sinan_update_ctl' f' SET prelim = False, last_insert = {ti.execution_date}' - f' WHERE disease = {disease} AND year = {year}' + f' WHERE disease = "{disease}" AND year = {year}' ) @task(task_id='update_prelims') @@ -296,7 +306,7 @@ def update_prelim_parquets(**kwargs): conn.execute( f'UPDATE {schema}.sinan_update_ctl' f' SET last_insert = {ti.execution_date}' - f' WHERE disease = {disease} AND year = {year}' + f' WHERE disease = "{disease}" AND year = {year}' ) @task(trigger_rule="all_done") From 2c650c19840211c9d448636ecc22eadc6dcdf6ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 12:04:34 -0300 Subject: [PATCH 06/22] Linter --- containers/airflow/dags/brasil/sinan.py | 149 +++++++++--------- .../airflow/dags/brasil/sinan_metadata.py | 31 ++-- 2 files changed, 90 insertions(+), 90 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index e478e60f..fb2f088f 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -1,9 +1,9 @@ -""" +""" @author Luã Bida Vacaro | github.com/luabida @date Last change on 2023-03-14 This is an Airflow DAG. This DAG is responsible for running scripts for -collecting data from PySUS SINAN. The API that fetches the data is +collecting data from PySUS SINAN. The API that fetches the data is available on: https://github.com/AlertaDengue/PySUS A detailed article about the Airflow used in EpiGraphHub can be found @@ -13,7 +13,7 @@ Task Summary ------------ -start (PythonOperator): +start (PythonOperator): This task is the start of the task flow. It will count the rows for a disease and store it as a XCom value. @@ -35,7 +35,6 @@ done (PythonOperator): This task will fail if any task above fails, breaking the DAG. - """ import pendulum import logging as logger @@ -48,13 +47,13 @@ from epigraphhub.settings import env DEFAULT_ARGS = { - "owner": "epigraphhub", - "depends_on_past": False, - "email": ["epigraphhub@thegraphnetwork.org"], - "email_on_failure": True, - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(minutes=2), + 'owner': 'epigraphhub', + 'depends_on_past': False, + 'email': ['epigraphhub@thegraphnetwork.org'], + 'email_on_failure': True, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=2), } @@ -70,22 +69,19 @@ def task_flow_for(disease: str): from epigraphhub.data.brasil.sinan import FTP_SINAN, normalize_str schema = 'brasil' - tablename = "sinan_" + normalize_str(disease) + "_m" + tablename = 'sinan_' + normalize_str(disease) + '_m' engine = get_engine(credential_name=env.db.default_credential) prelim_years = list(map(int, FTP_SINAN(disease).get_years('prelim'))) finals_years = list(map(int, FTP_SINAN(disease).get_years('finais'))) get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) - + upload_df = lambda df: df.to_sql( - name=tablename, - con=engine.connect(), - schema=schema, - if_exists='append' - ) + name=tablename, con=engine.connect(), schema=schema, if_exists='append' + ) - @task(task_id="start") + @task(task_id='start') def start_task(): """ Task to start the workflow, extracts all the last update date @@ -102,8 +98,8 @@ def start_task(): ' last_insert DATE' ')' ) - - @task(task_id="get_updates") + + @task(task_id='get_updates') def dbcs_to_fetch() -> dict: all_years = prelim_years + finals_years @@ -131,41 +127,43 @@ def dbcs_to_fetch() -> dict: except UndefinedColumn: years = [] db_years.extend(list(chain(*years))) - + prelim_to_final = [y for y in finals_years if y in db_prelimns] prelim_to_update = [y for y in prelim_years if y in db_prelimns] return dict( - to_insert = not_inserted, - to_finals = prelim_to_final, - to_update = prelim_to_update + to_insert=not_inserted, + to_finals=prelim_to_final, + to_update=prelim_to_update, ) - @task(task_id="extract") + @task(task_id='extract') def extract_parquets(**kwargs) -> dict: from epigraphhub.data.brasil.sinan import extract - ti = kwargs["ti"] - years = ti.xcom_pull(task_ids="get_updates") + ti = kwargs['ti'] + years = ti.xcom_pull(task_ids='get_updates') - extract_pqs = lambda stage: extract.download( - disease=disease, years=years[stage] - ) if any(years[stage]) else () + extract_pqs = ( + lambda stage: extract.download(disease=disease, years=years[stage]) + if any(years[stage]) + else () + ) return dict( - pqs_to_insert = extract_pqs('to_insert'), - pqs_to_finals = extract_pqs('to_finals'), - pqs_to_update = extract_pqs('to_update') + pqs_to_insert=extract_pqs('to_insert'), + pqs_to_finals=extract_pqs('to_finals'), + pqs_to_update=extract_pqs('to_update'), ) - + @task(task_id='first_insertion') def upload_not_inserted(**kwargs) -> dict: from epigraphhub.data.brasil.sinan import viz - ti = kwargs["ti"] - parquets = ti.xcom_pull(task_ids="extract")['pqs_to_insert'] + ti = kwargs['ti'] + parquets = ti.xcom_pull(task_ids='extract')['pqs_to_insert'] inserted_rows = dict() - + if not parquets: logger.info('There is no new DBCs to insert on DB') raise AirflowSkipException() @@ -173,18 +171,18 @@ def upload_not_inserted(**kwargs) -> dict: finals, prelims = ([], []) for parquet in parquets: ( - finals.append(parquet) - if get_year(parquet) in finals_years + finals.append(parquet) + if get_year(parquet) in finals_years else prelims.append(get_year(parquet)) ) - - for final_pq in (finals or []): + + for final_pq in finals or []: year = get_year(final_pq) df = viz.parquet(final_pq) if df.empty: raise ValueError('DataFrame is empty') - df['year'] = (year) - df['prelim'] = (False) + df['year'] = year + df['prelim'] = False upload_df(df) logger.info(f'{final_pq} inserted into db') with engine.connect() as conn: @@ -204,8 +202,8 @@ def upload_not_inserted(**kwargs) -> dict: df = viz.parquet(prelim_pq) if df.empty: raise ValueError('DataFrame is empty') - df['year'] = (year) - df['prelim'] = (True) + df['year'] = year + df['prelim'] = True upload_df(df) logger.info(f'{prelim_pq} inserted into db') with engine.connect() as conn: @@ -219,29 +217,29 @@ def upload_not_inserted(**kwargs) -> dict: f' WHERE year = {year}' ) inserted_rows[year] = cur.fetchone[0] - + return inserted_rows @task(task_id='prelims_to_finals') def update_prelim_to_final(**kwargs): from epigraphhub.data.brasil.sinan import viz - ti = kwargs["ti"] - parquets = ti.xcom_pull(task_ids="extract")['pqs_to_finals'] + ti = kwargs['ti'] + parquets = ti.xcom_pull(task_ids='extract')['pqs_to_finals'] if not parquets: logger.info( 'Not found any prelim DBC that have been passed to finals' ) raise AirflowSkipException() - + for parquet in parquets: year = get_year(parquet) df = viz.parquet(parquet) if df.empty: raise ValueError('DataFrame is empty') - df['year'] = (year) - df['prelim'] = (False) + df['year'] = year + df['prelim'] = False with engine.connect() as conn: conn.execute( @@ -249,11 +247,9 @@ def update_prelim_to_final(**kwargs): f' WHERE year = {year}' f' AND prelim = True' ) - + upload_df(df) - logger.info( - f'{parquet} data updated from prelim to final.' - ) + logger.info(f'{parquet} data updated from prelim to final.') with engine.connect() as conn: conn.execute( @@ -266,20 +262,20 @@ def update_prelim_to_final(**kwargs): def update_prelim_parquets(**kwargs): from epigraphhub.data.brasil.sinan import viz - ti = kwargs["ti"] - parquets = ti.xcom_pull(task_ids="extract")['pqs_to_update'] + ti = kwargs['ti'] + parquets = ti.xcom_pull(task_ids='extract')['pqs_to_update'] if not parquets: logger.info('No preliminary parquet found to update') - raise AirflowSkipException() + raise AirflowSkipException() for parquet in parquets: year = get_year(parquet) df = viz.parquet(parquet) if df.empty: raise ValueError('DataFrame is empty') - df['year'] = (year) - df['prelim'] = (True) + df['year'] = year + df['prelim'] = True with engine.connect() as conn: cur = conn.execute( @@ -292,7 +288,7 @@ def update_prelim_parquets(**kwargs): f' AND prelim = True' ) old_rows = cur.fetchone[0] - + upload_df(df) logger.info( f'{parquet} data updated' @@ -309,16 +305,17 @@ def update_prelim_parquets(**kwargs): f' WHERE disease = "{disease}" AND year = {year}' ) - @task(trigger_rule="all_done") + @task(trigger_rule='all_done') def remove_parquets(**kwargs) -> None: import shutil + """ This task will be responsible for deleting all parquet files downloaded. It will receive the same parquet dirs the `upload` task receives and delete all them. """ - ti = kwargs["ti"] - pqts = ti.xcom_pull(task_ids="extract") + ti = kwargs['ti'] + pqts = ti.xcom_pull(task_ids='extract') parquet_dirs = list( chain(*(pqts['to_insert'], pqts['to_finals'], pqts['to_update'])) @@ -326,12 +323,11 @@ def remove_parquets(**kwargs) -> None: for dir in parquet_dirs: shutil.rmtree(dir, ignore_errors=True) - logger.warning(f"{dir} removed") - + logger.warning(f'{dir} removed') end = EmptyOperator( - task_id="done", - trigger_rule="all_success", + task_id='done', + trigger_rule='all_success', ) # Defining the tasks @@ -349,6 +345,7 @@ def remove_parquets(**kwargs) -> None: from epigraphhub.data.brasil.sinan import DISEASES + def create_dag( disease: str, schedule: str, @@ -361,18 +358,18 @@ def create_dag( """ from airflow import DAG - sinan_tag = ["SINAN", "Brasil"] + sinan_tag = ['SINAN', 'Brasil'] sinan_tag.append(disease) DEFAULT_ARGS.update(start_date=start) dag = DAG( - "SINAN_" + DISEASES[disease], + 'SINAN_' + DISEASES[disease], default_args=DEFAULT_ARGS, # Tasks and Dags tags=sinan_tag, # Only DAGs start_date=start, catchup=False, schedule_interval=schedule, - dagrun_timeout=timedelta(minutes=10) + dagrun_timeout=timedelta(minutes=10), ) with dag: @@ -386,9 +383,11 @@ def create_dag( for disease in DISEASES: # Change DAG variables here - dag_id = "SINAN_" + DISEASES[disease] + dag_id = 'SINAN_' + DISEASES[disease] globals()[dag_id] = create_dag( disease, - schedule="@monthly", - start=pendulum.datetime(2022, 2, len(disease)), # avoid memory overhead + schedule='@monthly', + start=pendulum.datetime( + 2022, 2, len(disease) + ), # avoid memory overhead ) diff --git a/containers/airflow/dags/brasil/sinan_metadata.py b/containers/airflow/dags/brasil/sinan_metadata.py index e771668a..0de710e7 100644 --- a/containers/airflow/dags/brasil/sinan_metadata.py +++ b/containers/airflow/dags/brasil/sinan_metadata.py @@ -14,23 +14,23 @@ ) DEFAULT_ARGS = { - "owner": "epigraphhub", - "depends_on_past": False, - "email": ["epigraphhub@thegraphnetwork.org"], - "email_on_failure": True, - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(minutes=2), + 'owner': 'epigraphhub', + 'depends_on_past': False, + 'email': ['epigraphhub@thegraphnetwork.org'], + 'email_on_failure': True, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=2), } @dag( - "SINAN_METADATA", + 'SINAN_METADATA', default_args=DEFAULT_ARGS, - tags=["SINAN", "Brasil", "Metadata"], + tags=['SINAN', 'Brasil', 'Metadata'], start_date=pendulum.datetime(2022, 2, 1), catchup=False, - schedule_interval="@once", + schedule_interval='@once', ) def metadata_tables(): """ @@ -42,24 +42,25 @@ def metadata_tables(): engine = get_engine(credential_name=env.db.default_credential) - @task(task_id="insert_metadata_tables") + @task(task_id='insert_metadata_tables') def metadata_tables(): for disease in DISEASES: try: metadata_df = SINAN.metadata_df(disease) pd.DataFrame.to_sql( metadata_df, - f"sinan_{normalize_str(disease)}_metadata", + f'sinan_{normalize_str(disease)}_metadata', con=engine, schema='brasil', - if_exists="replace", + if_exists='replace', ) - logger.info(f"Metadata table for {disease} updated.") + logger.info(f'Metadata table for {disease} updated.') except Exception: - print(f"No metadata available for {disease}") + print(f'No metadata available for {disease}') meta = metadata_tables() meta + dag = metadata_tables() From 688243f621ee19f5ec9173fb860b944259e67f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 13:41:54 -0300 Subject: [PATCH 07/22] BUG: airflow only recognizes dags after command airflow dags reserialize --- containers/airflow/dags/brasil/sinan.py | 14 ++++++++------ containers/airflow/scripts/startup.sh | 3 +++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index fb2f088f..d33e3fdc 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -111,8 +111,9 @@ def dbcs_to_fetch() -> dict: f' WHERE disease = "{disease}"' ) years = cur.all() - except UndefinedColumn: - years = [] + except Exception as e: + if "UndefinedColumn" in str(e): + years = [] db_years.extend(list(chain(*years))) not_inserted = [y for y in all_years if y not in db_years] @@ -124,8 +125,9 @@ def dbcs_to_fetch() -> dict: f' WHERE disease = "{disease}" AND prelim IS True' ) years = cur.all() - except UndefinedColumn: - years = [] + except Exception as e: + if "UndefinedColumn" in str(e): + years = [] db_years.extend(list(chain(*years))) prelim_to_final = [y for y in finals_years if y in db_prelimns] @@ -189,7 +191,7 @@ def upload_not_inserted(**kwargs) -> dict: conn.execute( f'INSERT INTO {schema}.sinan_update_ctl(' 'disease, year, prelim, last_insert) VALUES (' - f'{disease}, {year}, False, {ti.execution_date})' + f'"{disease}", {year}, False, {ti.execution_date})' ) cur = conn.execute( f'SELECT COUNT(*) FROM {schema}.{tablename}' @@ -210,7 +212,7 @@ def upload_not_inserted(**kwargs) -> dict: conn.execute( f'INSERT INTO {schema}.sinan_update_ctl(' 'disease, year, prelim, last_insert) VALUES (' - f'{disease}, {year}, True, {ti.execution_date})' + f'"{disease}", {year}, True, {ti.execution_date})' ) cur = conn.execute( f'SELECT COUNT(*) FROM {schema}.{tablename}' diff --git a/containers/airflow/scripts/startup.sh b/containers/airflow/scripts/startup.sh index b4daa6dd..3602a3e4 100755 --- a/containers/airflow/scripts/startup.sh +++ b/containers/airflow/scripts/startup.sh @@ -20,6 +20,9 @@ echo "========= airflow scheduler =========" airflow scheduler & sleep 10 +airflow dags reserialize +sleep 5 + # just to keep the prompt blocked mkdir -p /tmp/empty cd /tmp/empty From 491eb7a462a36f330ed89d95ee7e6085efd37636 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 15:09:49 -0300 Subject: [PATCH 08/22] Updating airflow to 2.5.2 --- conda/airflow.yaml | 2 +- containers/airflow/airflow.cfg | 4 +- containers/airflow/dags/brasil/sinan.py | 62 ++++++------------- .../airflow/dags/brasil/sinan_metadata.py | 2 +- containers/airflow/dags/colombia_dag.py | 2 +- .../airflow/dags/covidch_dash_for_dag.py | 4 +- .../airflow/dags/covidch_dash_train_dag.py | 4 +- containers/airflow/dags/foph_dag.py | 2 +- containers/airflow/dags/owid_dag.py | 2 +- .../dags/tests/webservices_status_dag.py | 2 +- 10 files changed, 31 insertions(+), 55 deletions(-) diff --git a/conda/airflow.yaml b/conda/airflow.yaml index 3353fd08..d9940b24 100644 --- a/conda/airflow.yaml +++ b/conda/airflow.yaml @@ -3,7 +3,7 @@ channels: - nodefaults - conda-forge dependencies: - - airflow 2.3.3 + - airflow 2.5.2 - fiona - geopandas - gsheetsdb diff --git a/containers/airflow/airflow.cfg b/containers/airflow/airflow.cfg index efff4813..5816b268 100644 --- a/containers/airflow/airflow.cfg +++ b/containers/airflow/airflow.cfg @@ -316,7 +316,7 @@ encrypt_s3_logs = False # Logging level. # # Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. -logging_level = INFO +logging_level = DEBUG # Logging level for celery. If not set, it uses the value of logging_level # @@ -960,7 +960,7 @@ min_file_process_interval = 30 # How often (in seconds) to check for stale DAGs (DAGs which are no longer present in # the expected files) which should be deactivated. -deactivate_stale_dags_interval = 60 +parsing_cleanup_interval = 60 # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300 diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index d33e3fdc..15b0f39e 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -39,7 +39,6 @@ import pendulum import logging as logger from datetime import timedelta -from psycopg2.errors import UndefinedColumn from airflow.decorators import task from airflow.operators.empty import EmptyOperator @@ -345,51 +344,28 @@ def remove_parquets(**kwargs) -> None: ini >> dbcs >> E >> upload_new >> to_final >> prelims >> clean >> end +# DAGs +# Here its where the DAGs are created, an specific case can be specified +from airflow.models.dag import DAG from epigraphhub.data.brasil.sinan import DISEASES +from airflow.utils.dag_parsing_context import get_parsing_context +current_dag_id = get_parsing_context().dag_id -def create_dag( - disease: str, - schedule: str, - start: pendulum.datetime, -): - """ - This method will be responsible for creating a DAG for a - SINAN disease. It will receive the disease, its schedule - and the start date, returning a DAG with the task flow. - """ - from airflow import DAG - - sinan_tag = ['SINAN', 'Brasil'] - sinan_tag.append(disease) - DEFAULT_ARGS.update(start_date=start) - - dag = DAG( - 'SINAN_' + DISEASES[disease], - default_args=DEFAULT_ARGS, # Tasks and Dags - tags=sinan_tag, # Only DAGs - start_date=start, - catchup=False, - schedule_interval=schedule, - dagrun_timeout=timedelta(minutes=10), - ) - - with dag: - task_flow_for(disease) - - return dag - - -# DAGs -# Here its where the DAGs are created, an specific case can be specified for disease in DISEASES: - # Change DAG variables here - dag_id = 'SINAN_' + DISEASES[disease] - globals()[dag_id] = create_dag( - disease, - schedule='@monthly', - start=pendulum.datetime( + if current_dag_id is not None and current_dag_id != dag_id: + continue # skip generation of non-selected DAG + + with DAG( + dag_id=dag_id, + default_args=DEFAULT_ARGS, + tags=['SINAN', 'Brasil', disease], + start_date=pendulum.datetime( 2022, 2, len(disease) - ), # avoid memory overhead - ) + ), + catchup=False, + schedule='@monthly', + dagrun_timeout=timedelta(minutes=10), + ): + task_flow_for(disease) diff --git a/containers/airflow/dags/brasil/sinan_metadata.py b/containers/airflow/dags/brasil/sinan_metadata.py index 0de710e7..2ab2b80e 100644 --- a/containers/airflow/dags/brasil/sinan_metadata.py +++ b/containers/airflow/dags/brasil/sinan_metadata.py @@ -30,7 +30,7 @@ tags=['SINAN', 'Brasil', 'Metadata'], start_date=pendulum.datetime(2022, 2, 1), catchup=False, - schedule_interval='@once', + schedule='@once', ) def metadata_tables(): """ diff --git a/containers/airflow/dags/colombia_dag.py b/containers/airflow/dags/colombia_dag.py index b7b95be6..70660263 100644 --- a/containers/airflow/dags/colombia_dag.py +++ b/containers/airflow/dags/colombia_dag.py @@ -23,7 +23,7 @@ @dag( - schedule_interval="@daily", + schedule="@daily", default_args=default_args, catchup=False, ) diff --git a/containers/airflow/dags/covidch_dash_for_dag.py b/containers/airflow/dags/covidch_dash_for_dag.py index 7d70eabb..d45014db 100644 --- a/containers/airflow/dags/covidch_dash_for_dag.py +++ b/containers/airflow/dags/covidch_dash_for_dag.py @@ -43,7 +43,7 @@ @dag( - schedule_interval="@weekly", + schedule="@weekly", default_args=default_args, catchup=False, ) @@ -164,4 +164,4 @@ def up_for_total_icu(): start >> up_for_new_hosp() >> up_for_total_hosp() >> up_for_total_icu() >> end -dag = update_covidch() \ No newline at end of file +dag = update_covidch() diff --git a/containers/airflow/dags/covidch_dash_train_dag.py b/containers/airflow/dags/covidch_dash_train_dag.py index f181858b..75518471 100644 --- a/containers/airflow/dags/covidch_dash_train_dag.py +++ b/containers/airflow/dags/covidch_dash_train_dag.py @@ -45,7 +45,7 @@ @dag( - schedule_interval=timedelta(days =60), + schedule=timedelta(days =60), default_args=default_args, catchup=False, ) @@ -157,4 +157,4 @@ def train(canton, start >> train_new_hosp >> train_total_hosp >> train_total_icu >> end -dag = train_covidch() \ No newline at end of file +dag = train_covidch() diff --git a/containers/airflow/dags/foph_dag.py b/containers/airflow/dags/foph_dag.py index 003cad6d..6ab8f1e3 100644 --- a/containers/airflow/dags/foph_dag.py +++ b/containers/airflow/dags/foph_dag.py @@ -95,7 +95,7 @@ @dag( - schedule_interval="@weekly", + schedule="@weekly", default_args=default_args, catchup=False, tags = ['CHE', 'FOPH', 'Switzerland'] diff --git a/containers/airflow/dags/owid_dag.py b/containers/airflow/dags/owid_dag.py index 77701984..a732e5c8 100644 --- a/containers/airflow/dags/owid_dag.py +++ b/containers/airflow/dags/owid_dag.py @@ -79,7 +79,7 @@ @dag( - schedule_interval="@daily", + schedule="@daily", default_args=default_args, catchup=False, tags=['OWID'] diff --git a/containers/airflow/dags/tests/webservices_status_dag.py b/containers/airflow/dags/tests/webservices_status_dag.py index f7fb3c1c..08a957a9 100644 --- a/containers/airflow/dags/tests/webservices_status_dag.py +++ b/containers/airflow/dags/tests/webservices_status_dag.py @@ -23,7 +23,7 @@ @dag( - schedule_interval="@daily", + schedule="@daily", default_args=default_args, catchup=False, ) From 95df911997e46c1b45bd33bfc01dc5e1219e1082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 17:50:41 -0300 Subject: [PATCH 09/22] fix SQL quotation mark error --- containers/airflow/dags/brasil/sinan.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 15b0f39e..0089d637 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -107,7 +107,7 @@ def dbcs_to_fetch() -> dict: try: cur = conn.execute( f'SELECT year FROM {schema}.sinan_update_ctl' - f' WHERE disease = "{disease}"' + f" WHERE disease = '{disease}'" ) years = cur.all() except Exception as e: @@ -121,7 +121,7 @@ def dbcs_to_fetch() -> dict: try: cur = conn.execute( f'SELECT year FROM {schema}.sinan_update_ctl' - f' WHERE disease = "{disease}" AND prelim IS True' + f" WHERE disease = '{disease}' AND prelim IS True" ) years = cur.all() except Exception as e: @@ -190,13 +190,13 @@ def upload_not_inserted(**kwargs) -> dict: conn.execute( f'INSERT INTO {schema}.sinan_update_ctl(' 'disease, year, prelim, last_insert) VALUES (' - f'"{disease}", {year}, False, {ti.execution_date})' + f"'{disease}', {year}, False, '{ti.execution_date}')" ) cur = conn.execute( f'SELECT COUNT(*) FROM {schema}.{tablename}' f' WHERE year = {year}' ) - inserted_rows[year] = cur.fetchone[0] + inserted_rows[year] = cur.fetchone()[0] for prelim_pq in prelims or []: year = get_year(prelim_pq) @@ -211,13 +211,13 @@ def upload_not_inserted(**kwargs) -> dict: conn.execute( f'INSERT INTO {schema}.sinan_update_ctl(' 'disease, year, prelim, last_insert) VALUES (' - f'"{disease}", {year}, True, {ti.execution_date})' + f"'{disease}', {year}, True, '{ti.execution_date}')" ) cur = conn.execute( f'SELECT COUNT(*) FROM {schema}.{tablename}' f' WHERE year = {year}' ) - inserted_rows[year] = cur.fetchone[0] + inserted_rows[year] = cur.fetchone()[0] return inserted_rows @@ -255,8 +255,8 @@ def update_prelim_to_final(**kwargs): with engine.connect() as conn: conn.execute( f'UPDATE {schema}.sinan_update_ctl' - f' SET prelim = False, last_insert = {ti.execution_date}' - f' WHERE disease = "{disease}" AND year = {year}' + f" SET prelim = False, last_insert = '{ti.execution_date}'" + f" WHERE disease = '{disease}' AND year = {year}" ) @task(task_id='update_prelims') @@ -288,7 +288,7 @@ def update_prelim_parquets(**kwargs): f' WHERE year = {year}' f' AND prelim = True' ) - old_rows = cur.fetchone[0] + old_rows = cur.fetchone()[0] upload_df(df) logger.info( @@ -302,8 +302,8 @@ def update_prelim_parquets(**kwargs): with engine.connect() as conn: conn.execute( f'UPDATE {schema}.sinan_update_ctl' - f' SET last_insert = {ti.execution_date}' - f' WHERE disease = "{disease}" AND year = {year}' + f" SET last_insert = '{ti.execution_date}'" + f" WHERE disease = '{disease}' AND year = {year}" ) @task(trigger_rule='all_done') From 824046d58bfc8b6d6ca2b62f3a5addc873db76b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 18:13:42 -0300 Subject: [PATCH 10/22] Fix remove_parquets task --- containers/airflow/dags/brasil/sinan.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 0089d637..38422811 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -319,9 +319,12 @@ def remove_parquets(**kwargs) -> None: pqts = ti.xcom_pull(task_ids='extract') parquet_dirs = list( - chain(*(pqts['to_insert'], pqts['to_finals'], pqts['to_update'])) + chain(*(pqts['pqs_to_insert'], pqts['pqs_to_finals'], pqts['pqs_to_update'])) ) + if not parquet_dirs: + raise AirflowSkipException() + for dir in parquet_dirs: shutil.rmtree(dir, ignore_errors=True) logger.warning(f'{dir} removed') From 23e88af72ce56c4777aa4c741926e4f841047baf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 18:26:19 -0300 Subject: [PATCH 11/22] Fix header docstrings --- containers/airflow/dags/brasil/sinan.py | 49 +++++++++++++------------ 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 38422811..1a20ebce 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -1,6 +1,6 @@ """ @author Luã Bida Vacaro | github.com/luabida -@date Last change on 2023-03-14 +@date Last change on 2023-03-24 This is an Airflow DAG. This DAG is responsible for running scripts for collecting data from PySUS SINAN. The API that fetches the data is @@ -14,27 +14,37 @@ ------------ start (PythonOperator): - This task is the start of the task flow. It will count the rows for - a disease and store it as a XCom value. + This task will create the control table, which will be responsible for + comparing the years for a disease in EGH SQL DB and the DBCs found on + DataSUS FTP server. It stores the information about the stage of the + year for the disease and when it was inserted. + +get_updates (PythonOperator): + This task will compare the preliminary and final years between the data + on EGH DB and the DBC files from the DataSUS FTP server. It will store + as a dict which years should be updated, which should pass to final or + insert it for the first time. extract (PythonOperator): - This task downloads parquet file from DataSUS via PySUS for a SINAN - disease. + This task will download every DBC from `get_updates` task as parquet. + +first_insertion (PythonOperator): + This task inserts every year from a SINAN disease that has not been found + on EGH DB. -upload (PythonOperator): - This task will upload a list of parquet files extracted into the EGH - Postgres Database, parsing the disease name according to the docs: - https://epigraphhub.readthedocs.io/en/latest/instruction_name_tables.html#all-names-schema-name-table-name-and-column-names +prelims_to_finals (PythonOperator): + This task will update the status for a year that have been passed from + preliminary to final. -diagnosis (PythonOperator): - This task will compare the number of rows before and after the insertion - and store the values as XComs. +update_prelims (PythonOperator): + This task will delete and re-insert the rows for every preliminary year + and log the amount of new rows inserted into EGH DB. -remove_parquets (PythonOperator): - This task will remove the parquet files returned from the extract task +all_done (PythonOperator): + This task will remove every parquet extracted in the `extract` task. -done (PythonOperator): - This task will fail if any task above fails, breaking the DAG. +end (EmptyOperator): + The end of the Task Flow. """ import pendulum import logging as logger @@ -82,12 +92,6 @@ def task_flow_for(disease: str): @task(task_id='start') def start_task(): - """ - Task to start the workflow, extracts all the last update date - for the each DBC file in FTP server. SINAN DAG will use the - previous start task run to decide rather the dbc should be - inserted into DB or not. - """ with engine.connect() as conn: conn.execute( f'CREATE TABLE IF NOT EXISTS {schema}.sinan_update_ctl (' @@ -309,7 +313,6 @@ def update_prelim_parquets(**kwargs): @task(trigger_rule='all_done') def remove_parquets(**kwargs) -> None: import shutil - """ This task will be responsible for deleting all parquet files downloaded. It will receive the same parquet dirs the `upload` From 5f266131429b0f0c469829322c7de2a8daeca318 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 24 Mar 2023 18:49:04 -0300 Subject: [PATCH 12/22] Ignore index in to_sql --- containers/airflow/dags/brasil/sinan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 1a20ebce..220586dd 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -85,9 +85,9 @@ def task_flow_for(disease: str): finals_years = list(map(int, FTP_SINAN(disease).get_years('finais'))) get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) - + upload_df = lambda df: df.to_sql( - name=tablename, con=engine.connect(), schema=schema, if_exists='append' + name=tablename, con=engine.connect(), schema=schema, if_exists='append', index=False ) @task(task_id='start') From 15ee8d2a991c43db46485309a33d55ee6b284bc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 10:13:10 -0300 Subject: [PATCH 13/22] Insert new columns dinamically if UndefinedColumn in to_df --- containers/airflow/dags/brasil/sinan.py | 127 +++++++++++++++--------- 1 file changed, 81 insertions(+), 46 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 220586dd..09486f08 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -49,6 +49,7 @@ import pendulum import logging as logger from datetime import timedelta +from sqlalchemy import MetaData, Table, Integer, Float, String, DateTime from airflow.decorators import task from airflow.operators.empty import EmptyOperator @@ -87,7 +88,11 @@ def task_flow_for(disease: str): get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) upload_df = lambda df: df.to_sql( - name=tablename, con=engine.connect(), schema=schema, if_exists='append', index=False + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append', + index=False ) @task(task_id='start') @@ -118,6 +123,7 @@ def dbcs_to_fetch() -> dict: if "UndefinedColumn" in str(e): years = [] db_years.extend(list(chain(*years))) + # Compare years found in ctl table with FTP server not_inserted = [y for y in all_years if y not in db_years] db_prelimns = [] @@ -132,8 +138,9 @@ def dbcs_to_fetch() -> dict: if "UndefinedColumn" in str(e): years = [] db_years.extend(list(chain(*years))) - + # Get years that are not prelim anymore prelim_to_final = [y for y in finals_years if y in db_prelimns] + # Get prelims prelim_to_update = [y for y in prelim_years if y in db_prelimns] return dict( @@ -150,9 +157,9 @@ def extract_parquets(**kwargs) -> dict: years = ti.xcom_pull(task_ids='get_updates') extract_pqs = ( - lambda stage: extract.download(disease=disease, years=years[stage]) - if any(years[stage]) - else () + lambda stage: extract.download( + disease=disease, years=years[stage] + ) if any(years[stage]) else () ) return dict( @@ -181,47 +188,73 @@ def upload_not_inserted(**kwargs) -> dict: else prelims.append(get_year(parquet)) ) - for final_pq in finals or []: - year = get_year(final_pq) - df = viz.parquet(final_pq) - if df.empty: - raise ValueError('DataFrame is empty') - df['year'] = year - df['prelim'] = False - upload_df(df) - logger.info(f'{final_pq} inserted into db') - with engine.connect() as conn: - conn.execute( - f'INSERT INTO {schema}.sinan_update_ctl(' - 'disease, year, prelim, last_insert) VALUES (' - f"'{disease}', {year}, False, '{ti.execution_date}')" - ) - cur = conn.execute( - f'SELECT COUNT(*) FROM {schema}.{tablename}' - f' WHERE year = {year}' - ) - inserted_rows[year] = cur.fetchone()[0] - - for prelim_pq in prelims or []: - year = get_year(prelim_pq) - df = viz.parquet(prelim_pq) - if df.empty: - raise ValueError('DataFrame is empty') - df['year'] = year - df['prelim'] = True - upload_df(df) - logger.info(f'{prelim_pq} inserted into db') - with engine.connect() as conn: - conn.execute( - f'INSERT INTO {schema}.sinan_update_ctl(' - 'disease, year, prelim, last_insert) VALUES (' - f"'{disease}', {year}, True, '{ti.execution_date}')" - ) - cur = conn.execute( - f'SELECT COUNT(*) FROM {schema}.{tablename}' - f' WHERE year = {year}' - ) - inserted_rows[year] = cur.fetchone()[0] + def insert_parquerts(stage): + parquets = finals or [] if stage == 'finals' else prelims or [] + prelim = False if stage == 'finals' else True + + for parquet in parquets: + year = get_year(parquet) + df = viz.parquet(parquet) + + if df.empty: + raise ValueError('DataFrame is empty') + + df['year'] = year + df['prelim'] = prelim + df.columns = map(str.lower, df.columns) + try: + upload_df(df) + logger.info(f'{parquet} inserted into db') + except Exception as e: + if "UndefinedColumn" in str(e): + sql_dtypes = { + 'int64': Integer, + 'float64': Float, + 'object': String, + 'datetime64[ns]': DateTime, + } + metadata = MetaData() + metadata.reflect(bind=engine) + table = Table( + f'{schema}.{tablename}', + metadata, + autoload=True, + autoload_with=engine + ) + + tcolumns = [column.name for column in table.columns] + newcols = [c for c in df.columns if c not in tcolumns] + + insert_cols_query = f'ALTER TABLE {schema}.{tablename}' + for column in newcols: + t = df[column].dtype + sqlt = sql_dtypes[str(t)] + add_col = f' ADD COLUMN {column} {sqlt}' + if column == newcols[-1]: + add_col += ';' + else: + add_col += ',' + insert_cols_query += add_col + + with engine.connect() as conn: + conn.execute(insert_cols_query) + + with engine.connect() as conn: + conn.execute( + f'INSERT INTO {schema}.sinan_update_ctl(' + 'disease, year, prelim, last_insert) VALUES (' + f"'{disease}', {year}, {prelim}, '{ti.execution_date}')" + ) + cur = conn.execute( + f'SELECT COUNT(*) FROM {schema}.{tablename}' + f' WHERE year = {year}' + ) + inserted_rows[year] = cur.fetchone()[0] + + if finals: + insert_parquerts('finals') + if prelims: + insert_parquerts('prelims') return inserted_rows @@ -245,6 +278,7 @@ def update_prelim_to_final(**kwargs): raise ValueError('DataFrame is empty') df['year'] = year df['prelim'] = False + df.columns = map(str.lower, df.columns) with engine.connect() as conn: conn.execute( @@ -281,6 +315,7 @@ def update_prelim_parquets(**kwargs): raise ValueError('DataFrame is empty') df['year'] = year df['prelim'] = True + df.columns = map(str.lower, df.columns) with engine.connect() as conn: cur = conn.execute( From a216566ce6a4ce659bd0651fa69cfbf1dad9fba8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 10:54:37 -0300 Subject: [PATCH 14/22] Minor fixes --- containers/airflow/dags/brasil/sinan.py | 37 ++++++++++++------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 09486f08..f3a0a1ae 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -49,7 +49,6 @@ import pendulum import logging as logger from datetime import timedelta -from sqlalchemy import MetaData, Table, Integer, Float, String, DateTime from airflow.decorators import task from airflow.operators.empty import EmptyOperator @@ -120,7 +119,7 @@ def dbcs_to_fetch() -> dict: ) years = cur.all() except Exception as e: - if "UndefinedColumn" in str(e): + if "UndefinedColumn" or "NoSuchTableError" in str(e): years = [] db_years.extend(list(chain(*years))) # Compare years found in ctl table with FTP server @@ -191,10 +190,10 @@ def upload_not_inserted(**kwargs) -> dict: def insert_parquerts(stage): parquets = finals or [] if stage == 'finals' else prelims or [] prelim = False if stage == 'finals' else True - + print(parquets) for parquet in parquets: year = get_year(parquet) - df = viz.parquet(parquet) + df = viz.parquet(str(parquet)) if df.empty: raise ValueError('DataFrame is empty') @@ -208,28 +207,26 @@ def insert_parquerts(stage): except Exception as e: if "UndefinedColumn" in str(e): sql_dtypes = { - 'int64': Integer, - 'float64': Float, - 'object': String, - 'datetime64[ns]': DateTime, + 'int64': 'INT', + 'float64': 'FLOAT', + 'string': 'TEXT', + 'object': 'TEXT', + 'datetime64[ns]': 'TEXT', } - metadata = MetaData() - metadata.reflect(bind=engine) - table = Table( - f'{schema}.{tablename}', - metadata, - autoload=True, - autoload_with=engine - ) - - tcolumns = [column.name for column in table.columns] + + with engine.connect() as conn: + cur = conn.execute( + f'SELECT * FROM {schema}.{tablename} LIMIT 0' + ) + tcolumns = cur.keys() + newcols = [c for c in df.columns if c not in tcolumns] insert_cols_query = f'ALTER TABLE {schema}.{tablename}' for column in newcols: t = df[column].dtype sqlt = sql_dtypes[str(t)] - add_col = f' ADD COLUMN {column} {sqlt}' + add_col = f' ADD COLUMN {column} {str(sqlt)}' if column == newcols[-1]: add_col += ';' else: @@ -250,7 +247,7 @@ def insert_parquerts(stage): f' WHERE year = {year}' ) inserted_rows[year] = cur.fetchone()[0] - + if finals: insert_parquerts('finals') if prelims: From ce7eecb4261ec4f92c7c6b441b6a9d15cfdd51ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 11:14:25 -0300 Subject: [PATCH 15/22] Minor changes --- containers/airflow/dags/brasil/sinan.py | 39 ++++++++++++++++++------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index f3a0a1ae..56c7d54b 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -46,6 +46,7 @@ end (EmptyOperator): The end of the Task Flow. """ +import os import pendulum import logging as logger from datetime import timedelta @@ -169,7 +170,7 @@ def extract_parquets(**kwargs) -> dict: @task(task_id='first_insertion') def upload_not_inserted(**kwargs) -> dict: - from epigraphhub.data.brasil.sinan import viz + from pysus.online_data import parquets_to_dataframe ti = kwargs['ti'] parquets = ti.xcom_pull(task_ids='extract')['pqs_to_insert'] @@ -184,19 +185,23 @@ def upload_not_inserted(**kwargs) -> dict: ( finals.append(parquet) if get_year(parquet) in finals_years - else prelims.append(get_year(parquet)) + else prelims.append(parquet) ) def insert_parquerts(stage): parquets = finals or [] if stage == 'finals' else prelims or [] prelim = False if stage == 'finals' else True - print(parquets) + for parquet in parquets: + if not any(os.listdir(parquet)): + continue + year = get_year(parquet) - df = viz.parquet(str(parquet)) + df = parquets_to_dataframe(str(parquet)) if df.empty: - raise ValueError('DataFrame is empty') + logger('DataFrame is empty') + continue df['year'] = year df['prelim'] = prelim @@ -257,7 +262,7 @@ def insert_parquerts(stage): @task(task_id='prelims_to_finals') def update_prelim_to_final(**kwargs): - from epigraphhub.data.brasil.sinan import viz + from pysus.online_data import parquets_to_dataframe ti = kwargs['ti'] parquets = ti.xcom_pull(task_ids='extract')['pqs_to_finals'] @@ -269,10 +274,16 @@ def update_prelim_to_final(**kwargs): raise AirflowSkipException() for parquet in parquets: + if not any(os.listdir(parquet)): + continue + year = get_year(parquet) - df = viz.parquet(parquet) + df = parquets_to_dataframe(parquet) + if df.empty: - raise ValueError('DataFrame is empty') + logger.info('DataFrame is empty') + continue + df['year'] = year df['prelim'] = False df.columns = map(str.lower, df.columns) @@ -296,7 +307,7 @@ def update_prelim_to_final(**kwargs): @task(task_id='update_prelims') def update_prelim_parquets(**kwargs): - from epigraphhub.data.brasil.sinan import viz + from pysus.online_data import parquets_to_dataframe ti = kwargs['ti'] parquets = ti.xcom_pull(task_ids='extract')['pqs_to_update'] @@ -306,10 +317,16 @@ def update_prelim_parquets(**kwargs): raise AirflowSkipException() for parquet in parquets: + if not any(os.listdir(parquet)): + continue + year = get_year(parquet) - df = viz.parquet(parquet) + df = parquets_to_dataframe(parquet) + if df.empty: - raise ValueError('DataFrame is empty') + logger.info('DataFrame is empty') + continue + df['year'] = year df['prelim'] = True df.columns = map(str.lower, df.columns) From 1fe186d1ae1105cfee5f363ad31223b33acddb09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 11:24:45 -0300 Subject: [PATCH 16/22] Fixes requested by fccoelho --- containers/airflow/dags/brasil/sinan.py | 69 ++++++++++++------------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 56c7d54b..09221c2a 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -50,6 +50,7 @@ import pendulum import logging as logger from datetime import timedelta +from psycopg2.errors import UndefinedColumn from airflow.decorators import task from airflow.operators.empty import EmptyOperator @@ -134,9 +135,8 @@ def dbcs_to_fetch() -> dict: f" WHERE disease = '{disease}' AND prelim IS True" ) years = cur.all() - except Exception as e: - if "UndefinedColumn" in str(e): - years = [] + except UndefinedColumn: + years = [] db_years.extend(list(chain(*years))) # Get years that are not prelim anymore prelim_to_final = [y for y in finals_years if y in db_prelimns] @@ -209,37 +209,36 @@ def insert_parquerts(stage): try: upload_df(df) logger.info(f'{parquet} inserted into db') - except Exception as e: - if "UndefinedColumn" in str(e): - sql_dtypes = { - 'int64': 'INT', - 'float64': 'FLOAT', - 'string': 'TEXT', - 'object': 'TEXT', - 'datetime64[ns]': 'TEXT', - } - - with engine.connect() as conn: - cur = conn.execute( - f'SELECT * FROM {schema}.{tablename} LIMIT 0' - ) - tcolumns = cur.keys() - - newcols = [c for c in df.columns if c not in tcolumns] - - insert_cols_query = f'ALTER TABLE {schema}.{tablename}' - for column in newcols: - t = df[column].dtype - sqlt = sql_dtypes[str(t)] - add_col = f' ADD COLUMN {column} {str(sqlt)}' - if column == newcols[-1]: - add_col += ';' - else: - add_col += ',' - insert_cols_query += add_col - - with engine.connect() as conn: - conn.execute(insert_cols_query) + except UndefinedColumn: + sql_dtypes = { + 'int64': 'INT', + 'float64': 'FLOAT', + 'string': 'TEXT', + 'object': 'TEXT', + 'datetime64[ns]': 'TEXT', + } + + with engine.connect() as conn: + cur = conn.execute( + f'SELECT * FROM {schema}.{tablename} LIMIT 0' + ) + tcolumns = cur.keys() + + newcols = [c for c in df.columns if c not in tcolumns] + + insert_cols_query = f'ALTER TABLE {schema}.{tablename}' + for column in newcols: + t = df[column].dtype + sqlt = sql_dtypes[str(t)] + add_col = f' ADD COLUMN {column} {str(sqlt)}' + if column == newcols[-1]: + add_col += ';' + else: + add_col += ',' + insert_cols_query += add_col + + with engine.connect() as conn: + conn.execute(insert_cols_query) with engine.connect() as conn: conn.execute( @@ -251,7 +250,7 @@ def insert_parquerts(stage): f'SELECT COUNT(*) FROM {schema}.{tablename}' f' WHERE year = {year}' ) - inserted_rows[year] = cur.fetchone()[0] + inserted_rows[str(year)] = cur.fetchone()[0] if finals: insert_parquerts('finals') From 55bc409804151e53a9f00fc36c05f63aa2edc3be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 11:25:20 -0300 Subject: [PATCH 17/22] Fix typo --- containers/airflow/dags/brasil/sinan.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 09221c2a..121b1375 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -188,7 +188,7 @@ def upload_not_inserted(**kwargs) -> dict: else prelims.append(parquet) ) - def insert_parquerts(stage): + def insert_parquets(stage): parquets = finals or [] if stage == 'finals' else prelims or [] prelim = False if stage == 'finals' else True @@ -253,9 +253,9 @@ def insert_parquerts(stage): inserted_rows[str(year)] = cur.fetchone()[0] if finals: - insert_parquerts('finals') + insert_parquets('finals') if prelims: - insert_parquerts('prelims') + insert_parquets('prelims') return inserted_rows From d852bdc57be175911c8ddbdd87c23eb6b4b97ca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 11:38:23 -0300 Subject: [PATCH 18/22] Rolling back UndefinedColumn error --- containers/airflow/dags/brasil/sinan.py | 70 +++++++++++++------------ 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 121b1375..985e4225 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -135,8 +135,9 @@ def dbcs_to_fetch() -> dict: f" WHERE disease = '{disease}' AND prelim IS True" ) years = cur.all() - except UndefinedColumn: - years = [] + except Exception as e: + if "UndefinedColumn" in str(e): + years = [] db_years.extend(list(chain(*years))) # Get years that are not prelim anymore prelim_to_final = [y for y in finals_years if y in db_prelimns] @@ -209,37 +210,38 @@ def insert_parquets(stage): try: upload_df(df) logger.info(f'{parquet} inserted into db') - except UndefinedColumn: - sql_dtypes = { - 'int64': 'INT', - 'float64': 'FLOAT', - 'string': 'TEXT', - 'object': 'TEXT', - 'datetime64[ns]': 'TEXT', - } + except Exception as e: + if "UndefinedColumn" in str(e): + sql_dtypes = { + 'int64': 'INT', + 'float64': 'FLOAT', + 'string': 'TEXT', + 'object': 'TEXT', + 'datetime64[ns]': 'TEXT', + } + + with engine.connect() as conn: + cur = conn.execute( + f'SELECT * FROM {schema}.{tablename} LIMIT 0' + ) + tcolumns = cur.keys() + + newcols = [c for c in df.columns if c not in tcolumns] + + insert_cols_query = f'ALTER TABLE {schema}.{tablename}' + for column in newcols: + t = df[column].dtype + sqlt = sql_dtypes[str(t)] + add_col = f' ADD COLUMN {column} {str(sqlt)}' + if column == newcols[-1]: + add_col += ';' + else: + add_col += ',' + insert_cols_query += add_col + + with engine.connect() as conn: + conn.execute(insert_cols_query) - with engine.connect() as conn: - cur = conn.execute( - f'SELECT * FROM {schema}.{tablename} LIMIT 0' - ) - tcolumns = cur.keys() - - newcols = [c for c in df.columns if c not in tcolumns] - - insert_cols_query = f'ALTER TABLE {schema}.{tablename}' - for column in newcols: - t = df[column].dtype - sqlt = sql_dtypes[str(t)] - add_col = f' ADD COLUMN {column} {str(sqlt)}' - if column == newcols[-1]: - add_col += ';' - else: - add_col += ',' - insert_cols_query += add_col - - with engine.connect() as conn: - conn.execute(insert_cols_query) - with engine.connect() as conn: conn.execute( f'INSERT INTO {schema}.sinan_update_ctl(' @@ -404,6 +406,8 @@ def remove_parquets(**kwargs) -> None: from epigraphhub.data.brasil.sinan import DISEASES from airflow.utils.dag_parsing_context import get_parsing_context +from random import randint + current_dag_id = get_parsing_context().dag_id for disease in DISEASES: @@ -416,7 +420,7 @@ def remove_parquets(**kwargs) -> None: default_args=DEFAULT_ARGS, tags=['SINAN', 'Brasil', disease], start_date=pendulum.datetime( - 2022, 2, len(disease) + 2022, 2, randint(1,28) ), catchup=False, schedule='@monthly', From 96ec31b30236b3bf846221792ce47ff10548e92e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 13:11:43 -0300 Subject: [PATCH 19/22] Fix var typo --- containers/airflow/dags/brasil/sinan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 985e4225..ed947013 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -138,7 +138,7 @@ def dbcs_to_fetch() -> dict: except Exception as e: if "UndefinedColumn" in str(e): years = [] - db_years.extend(list(chain(*years))) + db_prelimns.extend(list(chain(*years))) # Get years that are not prelim anymore prelim_to_final = [y for y in finals_years if y in db_prelimns] # Get prelims From 58dee3492509474e8cc7b871953654681b81a688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 13:25:16 -0300 Subject: [PATCH 20/22] Fixing task trigger rules --- containers/airflow/dags/brasil/sinan.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index ed947013..be4755e8 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -169,7 +169,7 @@ def extract_parquets(**kwargs) -> dict: pqs_to_update=extract_pqs('to_update'), ) - @task(task_id='first_insertion') + @task(task_id='first_insertion', trigger_rule='all_done') def upload_not_inserted(**kwargs) -> dict: from pysus.online_data import parquets_to_dataframe @@ -261,7 +261,7 @@ def insert_parquets(stage): return inserted_rows - @task(task_id='prelims_to_finals') + @task(task_id='prelims_to_finals', trigger_rule='all_done') def update_prelim_to_final(**kwargs): from pysus.online_data import parquets_to_dataframe @@ -306,7 +306,7 @@ def update_prelim_to_final(**kwargs): f" WHERE disease = '{disease}' AND year = {year}" ) - @task(task_id='update_prelims') + @task(task_id='update_prelims', trigger_rule='all_done') def update_prelim_parquets(**kwargs): from pysus.online_data import parquets_to_dataframe From ca12983bcffc9ff8f047fc835c4743f1330a04f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 14:38:38 -0300 Subject: [PATCH 21/22] Task to drop a SINAN table manually --- .../airflow/dags/brasil/sinan_drop_table.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 containers/airflow/dags/brasil/sinan_drop_table.py diff --git a/containers/airflow/dags/brasil/sinan_drop_table.py b/containers/airflow/dags/brasil/sinan_drop_table.py new file mode 100644 index 00000000..3bab1f03 --- /dev/null +++ b/containers/airflow/dags/brasil/sinan_drop_table.py @@ -0,0 +1,55 @@ +import pendulum +import logging as logger + +from datetime import timedelta +from epigraphhub.data.brasil.sinan import normalize_str + +from airflow import DAG +from airflow.decorators import dag +from airflow.operators.python import PythonOperator + +from epigraphhub.settings import env +from epigraphhub.connection import get_engine +from epigraphhub.data.brasil.sinan import ( + DISEASES, + normalize_str, +) + +DEFAULT_ARGS = { + 'owner': 'Admin', + 'depends_on_past': False, + 'email': ['epigraphhub@thegraphnetwork.org'], + 'email_on_failure': True, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=2), +} + +engine = get_engine(credential_name=env.db.default_credential) + +with DAG( + dag_id='SINAN_DROP_TABLE', + default_args=DEFAULT_ARGS, + tags=['SINAN', 'CTL'], + start_date=pendulum.datetime(2023, 3, 27), + catchup=False, + schedule=None, #Only manually triggered + description='A DAG to delete a SINAN table in EGH db', +): + def drop_table(disease: str): + dis = normalize_str(disease) + tablename = 'sinan_' + dis + '_m' + with engine.connect() as conn: + conn.execute( + f'DROP TABLE brasil.{tablename}' + ) + logger.warn(f'Dropped table {tablename} on schema brasil') + + delete_table_task = PythonOperator( + task_id='drop_table', + python_callable=drop_table, + op_kwargs={'disease': '{{ params.disease }}'}, + params={'disease': 'disease'}, + ) + + delete_table_task From ab2b26d46cbcef3de7cd00fadd16915581d94b81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Tue, 28 Mar 2023 14:48:05 -0300 Subject: [PATCH 22/22] psycopg2 exception and passing a list to remove tables dag --- containers/airflow/dags/brasil/sinan.py | 8 ++--- .../airflow/dags/brasil/sinan_drop_table.py | 29 ++++++++++--------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index be4755e8..82c92337 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -48,9 +48,9 @@ """ import os import pendulum +import psycopg2 import logging as logger from datetime import timedelta -from psycopg2.errors import UndefinedColumn from airflow.decorators import task from airflow.operators.empty import EmptyOperator @@ -135,9 +135,8 @@ def dbcs_to_fetch() -> dict: f" WHERE disease = '{disease}' AND prelim IS True" ) years = cur.all() - except Exception as e: - if "UndefinedColumn" in str(e): - years = [] + except psycopg2.errors.lookup("42703"): #this is the UndefinedColumn code + years = [] db_prelimns.extend(list(chain(*years))) # Get years that are not prelim anymore prelim_to_final = [y for y in finals_years if y in db_prelimns] @@ -320,7 +319,6 @@ def update_prelim_parquets(**kwargs): for parquet in parquets: if not any(os.listdir(parquet)): continue - year = get_year(parquet) df = parquets_to_dataframe(parquet) diff --git a/containers/airflow/dags/brasil/sinan_drop_table.py b/containers/airflow/dags/brasil/sinan_drop_table.py index 3bab1f03..1f877792 100644 --- a/containers/airflow/dags/brasil/sinan_drop_table.py +++ b/containers/airflow/dags/brasil/sinan_drop_table.py @@ -36,20 +36,21 @@ schedule=None, #Only manually triggered description='A DAG to delete a SINAN table in EGH db', ): - def drop_table(disease: str): - dis = normalize_str(disease) - tablename = 'sinan_' + dis + '_m' - with engine.connect() as conn: - conn.execute( - f'DROP TABLE brasil.{tablename}' - ) - logger.warn(f'Dropped table {tablename} on schema brasil') - - delete_table_task = PythonOperator( + def drop_tables(diseases: list): + for disease in diseases: + dis = normalize_str(disease) + tablename = 'sinan_' + dis + '_m' + with engine.connect() as conn: + conn.execute( + f'DROP TABLE brasil.{tablename}' + ) + logger.warn(f'Dropped table {tablename} on schema brasil') + + delete_tables_task = PythonOperator( task_id='drop_table', - python_callable=drop_table, - op_kwargs={'disease': '{{ params.disease }}'}, - params={'disease': 'disease'}, + python_callable=drop_tables, + op_kwargs={'diseases': '{{ params.diseases }}'}, + params={'diseases': 'diseases'}, ) - delete_table_task + delete_tables_task