diff --git a/conda/airflow.yaml b/conda/airflow.yaml index 3353fd08..d9940b24 100644 --- a/conda/airflow.yaml +++ b/conda/airflow.yaml @@ -3,7 +3,7 @@ channels: - nodefaults - conda-forge dependencies: - - airflow 2.3.3 + - airflow 2.5.2 - fiona - geopandas - gsheetsdb diff --git a/containers/airflow/airflow.cfg b/containers/airflow/airflow.cfg index b0425a50..5816b268 100644 --- a/containers/airflow/airflow.cfg +++ b/containers/airflow/airflow.cfg @@ -96,7 +96,7 @@ execute_tasks_new_python_interpreter = False donot_pickle = True # How long before timing out a python file import -dagbag_import_timeout = 30.0 +dagbag_import_timeout = 300.0 # Should a traceback be shown in the UI for dagbag import errors, # instead of just the exception message @@ -316,7 +316,7 @@ encrypt_s3_logs = False # Logging level. # # Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``. -logging_level = INFO +logging_level = DEBUG # Logging level for celery. If not set, it uses the value of logging_level # @@ -881,7 +881,7 @@ pool = prefork # The number of seconds to wait before timing out ``send_task_to_executor`` or # ``fetch_celery_task_state`` operations. -operation_timeout = 1.0 +operation_timeout = 5.0 # Celery task will report its status as 'started' when the task is executed by a worker. # This is used in Airflow to keep track of the running tasks and if a Scheduler is restarted @@ -960,7 +960,7 @@ min_file_process_interval = 30 # How often (in seconds) to check for stale DAGs (DAGs which are no longer present in # the expected files) which should be deactivated. -deactivate_stale_dags_interval = 60 +parsing_cleanup_interval = 60 # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300 diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 06478a44..82c92337 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -1,9 +1,9 @@ -""" +""" @author LuĆ£ Bida Vacaro | github.com/luabida -@date Last change on 2023-03-14 +@date Last change on 2023-03-24 This is an Airflow DAG. This DAG is responsible for running scripts for -collecting data from PySUS SINAN. The API that fetches the data is +collecting data from PySUS SINAN. The API that fetches the data is available on: https://github.com/AlertaDengue/PySUS A detailed article about the Airflow used in EpiGraphHub can be found @@ -13,61 +13,58 @@ Task Summary ------------ -start (PythonOperator): - This task is the start of the task flow. It will count the rows for - a disease and store it as a XCom value. +start (PythonOperator): + This task will create the control table, which will be responsible for + comparing the years for a disease in EGH SQL DB and the DBCs found on + DataSUS FTP server. It stores the information about the stage of the + year for the disease and when it was inserted. + +get_updates (PythonOperator): + This task will compare the preliminary and final years between the data + on EGH DB and the DBC files from the DataSUS FTP server. It will store + as a dict which years should be updated, which should pass to final or + insert it for the first time. extract (PythonOperator): - This task downloads parquet file from DataSUS via PySUS for a SINAN - disease. + This task will download every DBC from `get_updates` task as parquet. -upload (PythonOperator): - This task will upload a list of parquet files extracted into the EGH - Postgres Database, parsing the disease name according to the docs: - https://epigraphhub.readthedocs.io/en/latest/instruction_name_tables.html#all-names-schema-name-table-name-and-column-names +first_insertion (PythonOperator): + This task inserts every year from a SINAN disease that has not been found + on EGH DB. -diagnosis (PythonOperator): - This task will compare the number of rows before and after the insertion - and store the values as XComs. +prelims_to_finals (PythonOperator): + This task will update the status for a year that have been passed from + preliminary to final. -remove_parquets (PythonOperator): - This task will remove the parquet files returned from the extract task +update_prelims (PythonOperator): + This task will delete and re-insert the rows for every preliminary year + and log the amount of new rows inserted into EGH DB. -done (PythonOperator): - This task will fail if any task above fails, breaking the DAG. +all_done (PythonOperator): + This task will remove every parquet extracted in the `extract` task. +end (EmptyOperator): + The end of the Task Flow. """ -import shutil +import os import pendulum -import pandas as pd +import psycopg2 import logging as logger - from datetime import timedelta -from pysus.online_data import SINAN -from airflow import DAG -from airflow.decorators import task, dag +from airflow.decorators import task +from airflow.operators.empty import EmptyOperator from epigraphhub.settings import env -from epigraphhub.connection import get_engine -from epigraphhub.data.brasil.sinan import ( - FTP_SINAN, - extract, - loading, - DISEASES, - normalize_str, -) - -ENG = get_engine(credential_name=env.db.default_credential) -SCHEMA = "brasil" + DEFAULT_ARGS = { - "owner": "epigraphhub", - "depends_on_past": False, - "email": ["epigraphhub@thegraphnetwork.org"], - "email_on_failure": True, - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(minutes=2), + 'owner': 'epigraphhub', + 'depends_on_past': False, + 'email': ['epigraphhub@thegraphnetwork.org'], + 'email_on_failure': True, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=2), } @@ -77,189 +74,354 @@ def task_flow_for(disease: str): creating the task dependencies for the SINAN disease that is passed in. SINAN DAGs will have the same workflow. """ + from itertools import chain + from epigraphhub.connection import get_engine + from airflow.exceptions import AirflowSkipException + from epigraphhub.data.brasil.sinan import FTP_SINAN, normalize_str + + schema = 'brasil' + tablename = 'sinan_' + normalize_str(disease) + '_m' + engine = get_engine(credential_name=env.db.default_credential) + + prelim_years = list(map(int, FTP_SINAN(disease).get_years('prelim'))) + finals_years = list(map(int, FTP_SINAN(disease).get_years('finais'))) + + get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) + + upload_df = lambda df: df.to_sql( + name=tablename, + con=engine.connect(), + schema=schema, + if_exists='append', + index=False + ) - tablename = "sinan_" + normalize_str(disease) + "_m" - - def _count_table_rows() -> dict: - """ - Counts table rows from brasil's Schema - """ - with ENG.connect() as conn: + @task(task_id='start') + def start_task(): + with engine.connect() as conn: + conn.execute( + f'CREATE TABLE IF NOT EXISTS {schema}.sinan_update_ctl (' + ' disease TEXT NOT NULL,' + ' year INT NOT NULL,' + ' prelim BOOL NOT NULL,' + ' last_insert DATE' + ')' + ) + + @task(task_id='get_updates') + def dbcs_to_fetch() -> dict: + all_years = prelim_years + finals_years + + db_years = [] + with engine.connect() as conn: try: - cur = conn.execute(f"SELECT COUNT(*) FROM {SCHEMA}.{tablename}") - rowcount = cur.fetchone()[0] + cur = conn.execute( + f'SELECT year FROM {schema}.sinan_update_ctl' + f" WHERE disease = '{disease}'" + ) + years = cur.all() except Exception as e: - if "UndefinedTable" in str(e): - return dict(rows=0) - else: - raise e - return dict(rows=rowcount) - - @task(task_id="start") - def start() -> int: - """ - Task to start the workflow, will read the database and return - the rows count for a SINAN disease. - """ - logger.info(f"ETL started for {disease}") - return _count_table_rows() + if "UndefinedColumn" or "NoSuchTableError" in str(e): + years = [] + db_years.extend(list(chain(*years))) + # Compare years found in ctl table with FTP server + not_inserted = [y for y in all_years if y not in db_years] + + db_prelimns = [] + with engine.connect() as conn: + try: + cur = conn.execute( + f'SELECT year FROM {schema}.sinan_update_ctl' + f" WHERE disease = '{disease}' AND prelim IS True" + ) + years = cur.all() + except psycopg2.errors.lookup("42703"): #this is the UndefinedColumn code + years = [] + db_prelimns.extend(list(chain(*years))) + # Get years that are not prelim anymore + prelim_to_final = [y for y in finals_years if y in db_prelimns] + # Get prelims + prelim_to_update = [y for y in prelim_years if y in db_prelimns] + + return dict( + to_insert=not_inserted, + to_finals=prelim_to_final, + to_update=prelim_to_update, + ) + + @task(task_id='extract') + def extract_parquets(**kwargs) -> dict: + from epigraphhub.data.brasil.sinan import extract + + ti = kwargs['ti'] + years = ti.xcom_pull(task_ids='get_updates') + + extract_pqs = ( + lambda stage: extract.download( + disease=disease, years=years[stage] + ) if any(years[stage]) else () + ) + + return dict( + pqs_to_insert=extract_pqs('to_insert'), + pqs_to_finals=extract_pqs('to_finals'), + pqs_to_update=extract_pqs('to_update'), + ) + + @task(task_id='first_insertion', trigger_rule='all_done') + def upload_not_inserted(**kwargs) -> dict: + from pysus.online_data import parquets_to_dataframe + + ti = kwargs['ti'] + parquets = ti.xcom_pull(task_ids='extract')['pqs_to_insert'] + inserted_rows = dict() + + if not parquets: + logger.info('There is no new DBCs to insert on DB') + raise AirflowSkipException() + + finals, prelims = ([], []) + for parquet in parquets: + ( + finals.append(parquet) + if get_year(parquet) in finals_years + else prelims.append(parquet) + ) + + def insert_parquets(stage): + parquets = finals or [] if stage == 'finals' else prelims or [] + prelim = False if stage == 'finals' else True + + for parquet in parquets: + if not any(os.listdir(parquet)): + continue + + year = get_year(parquet) + df = parquets_to_dataframe(str(parquet)) + + if df.empty: + logger('DataFrame is empty') + continue + + df['year'] = year + df['prelim'] = prelim + df.columns = map(str.lower, df.columns) + try: + upload_df(df) + logger.info(f'{parquet} inserted into db') + except Exception as e: + if "UndefinedColumn" in str(e): + sql_dtypes = { + 'int64': 'INT', + 'float64': 'FLOAT', + 'string': 'TEXT', + 'object': 'TEXT', + 'datetime64[ns]': 'TEXT', + } + + with engine.connect() as conn: + cur = conn.execute( + f'SELECT * FROM {schema}.{tablename} LIMIT 0' + ) + tcolumns = cur.keys() + + newcols = [c for c in df.columns if c not in tcolumns] + + insert_cols_query = f'ALTER TABLE {schema}.{tablename}' + for column in newcols: + t = df[column].dtype + sqlt = sql_dtypes[str(t)] + add_col = f' ADD COLUMN {column} {str(sqlt)}' + if column == newcols[-1]: + add_col += ';' + else: + add_col += ',' + insert_cols_query += add_col + + with engine.connect() as conn: + conn.execute(insert_cols_query) + + with engine.connect() as conn: + conn.execute( + f'INSERT INTO {schema}.sinan_update_ctl(' + 'disease, year, prelim, last_insert) VALUES (' + f"'{disease}', {year}, {prelim}, '{ti.execution_date}')" + ) + cur = conn.execute( + f'SELECT COUNT(*) FROM {schema}.{tablename}' + f' WHERE year = {year}' + ) + inserted_rows[str(year)] = cur.fetchone()[0] + + if finals: + insert_parquets('finals') + if prelims: + insert_parquets('prelims') + + return inserted_rows + + @task(task_id='prelims_to_finals', trigger_rule='all_done') + def update_prelim_to_final(**kwargs): + from pysus.online_data import parquets_to_dataframe + + ti = kwargs['ti'] + parquets = ti.xcom_pull(task_ids='extract')['pqs_to_finals'] + + if not parquets: + logger.info( + 'Not found any prelim DBC that have been passed to finals' + ) + raise AirflowSkipException() + + for parquet in parquets: + if not any(os.listdir(parquet)): + continue + + year = get_year(parquet) + df = parquets_to_dataframe(parquet) + + if df.empty: + logger.info('DataFrame is empty') + continue + + df['year'] = year + df['prelim'] = False + df.columns = map(str.lower, df.columns) + + with engine.connect() as conn: + conn.execute( + f'DELETE FROM {schema}.{tablename}' + f' WHERE year = {year}' + f' AND prelim = True' + ) - @task(task_id="extract", retries=3) - def download(disease: str) -> list: - """ - This task is responsible for downloading every year found for - a disease. It will download at `/tmp/pysus/` and return a list - with downloaded parquet paths. - """ - years = FTP_SINAN(disease).get_years() - parquet_dirs = extract.download(disease=disease, years=years) - logger.info(f"Data for {disease} extracted") - return parquet_dirs + upload_df(df) + logger.info(f'{parquet} data updated from prelim to final.') - @task(task_id="upload") - def upload(disease: str, **kwargs) -> None: - """ - This task is responsible for uploading each parquet dir into - postgres database. It receives the disease name and the xcom - from `download` task to insert. - """ - ti = kwargs["ti"] - parquets_dirs = ti.xcom_pull(task_ids="extract") - for dir in parquets_dirs: - try: - loading.upload(disease=disease, parquet_dir=dir) - logger.info(f"{dir} inserted into db") - except Exception as e: - logger.error(e) - raise e + with engine.connect() as conn: + conn.execute( + f'UPDATE {schema}.sinan_update_ctl' + f" SET prelim = False, last_insert = '{ti.execution_date}'" + f" WHERE disease = '{disease}' AND year = {year}" + ) - @task(task_id="diagnosis") - def compare_tables_rows(**kwargs) -> int: - """ - This task will be responsible for checking how many rows were - inserted into a disease table. It will compare with the start - task and store the difference as a xcom. - """ - ti = kwargs["ti"] - ini_rows_amt = ti.xcom_pull(task_ids="start") - end_rows_amt = _count_table_rows() + @task(task_id='update_prelims', trigger_rule='all_done') + def update_prelim_parquets(**kwargs): + from pysus.online_data import parquets_to_dataframe + + ti = kwargs['ti'] + parquets = ti.xcom_pull(task_ids='extract')['pqs_to_update'] - new_rows = end_rows_amt["rows"] - ini_rows_amt["rows"] + if not parquets: + logger.info('No preliminary parquet found to update') + raise AirflowSkipException() - logger.info(f"{new_rows} new rows inserted into brasil.{tablename}") + for parquet in parquets: + if not any(os.listdir(parquet)): + continue + year = get_year(parquet) + df = parquets_to_dataframe(parquet) - ti.xcom_push(key="rows", value=ini_rows_amt["rows"]) - ti.xcom_push(key="new_rows", value=new_rows) + if df.empty: + logger.info('DataFrame is empty') + continue - @task(trigger_rule="all_done") + df['year'] = year + df['prelim'] = True + df.columns = map(str.lower, df.columns) + + with engine.connect() as conn: + cur = conn.execute( + f'SELECT COUNT(*) FROM {schema}.{tablename}' + f' WHERE year = {year}' + ) + conn.execute( + f'DELETE FROM {schema}.{tablename}' + f' WHERE year = {year}' + f' AND prelim = True' + ) + old_rows = cur.fetchone()[0] + + upload_df(df) + logger.info( + f'{parquet} data updated' + '\n~~~~~ ' + f'\nRows inserted: {len(df)}' + f'\nNew rows: {len(df) - int(old_rows)}' + '\n~~~~~ ' + ) + + with engine.connect() as conn: + conn.execute( + f'UPDATE {schema}.sinan_update_ctl' + f" SET last_insert = '{ti.execution_date}'" + f" WHERE disease = '{disease}' AND year = {year}" + ) + + @task(trigger_rule='all_done') def remove_parquets(**kwargs) -> None: + import shutil """ This task will be responsible for deleting all parquet files downloaded. It will receive the same parquet dirs the `upload` task receives and delete all them. """ - ti = kwargs["ti"] - parquet_dirs = ti.xcom_pull(task_ids="extract") + ti = kwargs['ti'] + pqts = ti.xcom_pull(task_ids='extract') + + parquet_dirs = list( + chain(*(pqts['pqs_to_insert'], pqts['pqs_to_finals'], pqts['pqs_to_update'])) + ) + + if not parquet_dirs: + raise AirflowSkipException() for dir in parquet_dirs: shutil.rmtree(dir, ignore_errors=True) - logger.warning(f"{dir} removed") + logger.warning(f'{dir} removed') - @task(trigger_rule="none_failed") - def done(**kwargs) -> None: - """This task will fail if any upstream task fails.""" - ti = kwargs["ti"] - print(ti.xcom_pull(key="state", task_ids="upload")) - if ti.xcom_pull(key="state", task_ids="upload") == "FAILED": - raise ValueError("Force failure because upstream task has failed") + end = EmptyOperator( + task_id='done', + trigger_rule='all_success', + ) # Defining the tasks - ini = start() - E = download(disease) - L = upload(disease) - diagnosis = compare_tables_rows() + ini = start_task() + dbcs = dbcs_to_fetch() + E = extract_parquets() + upload_new = upload_not_inserted() + to_final = update_prelim_to_final() + prelims = update_prelim_parquets() clean = remove_parquets() - end = done() # Task flow - ini >> E >> L >> diagnosis >> clean >> end - - -def create_dag( - disease: str, - schedule: str, - start: pendulum.datetime, -): - """ - This method will be responsible for creating a DAG for a - SINAN disease. It will receive the disease, its schedule - and the start date, returning a DAG with the task flow. - """ - sinan_tag = ["SINAN", "Brasil"] - sinan_tag.append(disease) - DEFAULT_ARGS.update(start_date=start) - - dag = DAG( - "SINAN_" + DISEASES[disease], - default_args=DEFAULT_ARGS, # Tasks and Dags - tags=sinan_tag, # Only DAGs - start_date=start, - catchup=False, - schedule_interval=schedule, - ) - - with dag: - task_flow_for(disease) - - return dag + ini >> dbcs >> E >> upload_new >> to_final >> prelims >> clean >> end # DAGs -@dag( - "SINAN_METADATA", - default_args=DEFAULT_ARGS, - tags=["SINAN", "Brasil", "Metadata"], - start_date=pendulum.datetime(2022, 2, 1), - catchup=False, - schedule_interval="@once", -) -def metadata_tables(): - """ - This DAG will run only once and create a table in the - database with each metadata found in the pysus SINAN - disease metadata. It will not fail if a metadata is not - available, only skip it. - """ - - @task(task_id="insert_metadata_tables") - def metadata_tables(): - for disease in DISEASES: - try: - metadata_df = SINAN.metadata_df(disease) - pd.DataFrame.to_sql( - metadata_df, - f"sinan_{normalize_str(disease)}_metadata", - con=ENG, - schema=SCHEMA, - if_exists="replace", - ) - - logger.info(f"Metadata table for {disease} updated.") - except Exception: - print(f"No metadata available for {disease}") +# Here its where the DAGs are created, an specific case can be specified +from airflow.models.dag import DAG +from epigraphhub.data.brasil.sinan import DISEASES +from airflow.utils.dag_parsing_context import get_parsing_context - meta = metadata_tables() - meta +from random import randint +current_dag_id = get_parsing_context().dag_id -dag = metadata_tables() -# Here its where the DAGs are created, an specific case can be specified for disease in DISEASES: - # Change DAG variables here - - dag_id = "SINAN_" + DISEASES[disease] - globals()[dag_id] = create_dag( - disease, - schedule="@monthly", - start=pendulum.datetime(2022, 2, len(disease)), # avoid memory overhead - ) + dag_id = 'SINAN_' + DISEASES[disease] + if current_dag_id is not None and current_dag_id != dag_id: + continue # skip generation of non-selected DAG + + with DAG( + dag_id=dag_id, + default_args=DEFAULT_ARGS, + tags=['SINAN', 'Brasil', disease], + start_date=pendulum.datetime( + 2022, 2, randint(1,28) + ), + catchup=False, + schedule='@monthly', + dagrun_timeout=timedelta(minutes=10), + ): + task_flow_for(disease) diff --git a/containers/airflow/dags/brasil/sinan_drop_table.py b/containers/airflow/dags/brasil/sinan_drop_table.py new file mode 100644 index 00000000..1f877792 --- /dev/null +++ b/containers/airflow/dags/brasil/sinan_drop_table.py @@ -0,0 +1,56 @@ +import pendulum +import logging as logger + +from datetime import timedelta +from epigraphhub.data.brasil.sinan import normalize_str + +from airflow import DAG +from airflow.decorators import dag +from airflow.operators.python import PythonOperator + +from epigraphhub.settings import env +from epigraphhub.connection import get_engine +from epigraphhub.data.brasil.sinan import ( + DISEASES, + normalize_str, +) + +DEFAULT_ARGS = { + 'owner': 'Admin', + 'depends_on_past': False, + 'email': ['epigraphhub@thegraphnetwork.org'], + 'email_on_failure': True, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=2), +} + +engine = get_engine(credential_name=env.db.default_credential) + +with DAG( + dag_id='SINAN_DROP_TABLE', + default_args=DEFAULT_ARGS, + tags=['SINAN', 'CTL'], + start_date=pendulum.datetime(2023, 3, 27), + catchup=False, + schedule=None, #Only manually triggered + description='A DAG to delete a SINAN table in EGH db', +): + def drop_tables(diseases: list): + for disease in diseases: + dis = normalize_str(disease) + tablename = 'sinan_' + dis + '_m' + with engine.connect() as conn: + conn.execute( + f'DROP TABLE brasil.{tablename}' + ) + logger.warn(f'Dropped table {tablename} on schema brasil') + + delete_tables_task = PythonOperator( + task_id='drop_table', + python_callable=drop_tables, + op_kwargs={'diseases': '{{ params.diseases }}'}, + params={'diseases': 'diseases'}, + ) + + delete_tables_task diff --git a/containers/airflow/dags/brasil/sinan_metadata.py b/containers/airflow/dags/brasil/sinan_metadata.py new file mode 100644 index 00000000..2ab2b80e --- /dev/null +++ b/containers/airflow/dags/brasil/sinan_metadata.py @@ -0,0 +1,66 @@ +import pendulum +import pandas as pd +import logging as logger + +from datetime import timedelta +from pysus.online_data import SINAN +from airflow.decorators import task, dag + +from epigraphhub.settings import env +from epigraphhub.connection import get_engine +from epigraphhub.data.brasil.sinan import ( + DISEASES, + normalize_str, +) + +DEFAULT_ARGS = { + 'owner': 'epigraphhub', + 'depends_on_past': False, + 'email': ['epigraphhub@thegraphnetwork.org'], + 'email_on_failure': True, + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(minutes=2), +} + + +@dag( + 'SINAN_METADATA', + default_args=DEFAULT_ARGS, + tags=['SINAN', 'Brasil', 'Metadata'], + start_date=pendulum.datetime(2022, 2, 1), + catchup=False, + schedule='@once', +) +def metadata_tables(): + """ + This DAG will run only once and create a table in the + database with each metadata found in the pysus SINAN + disease metadata. It will not fail if a metadata is not + available, only skip it. + """ + + engine = get_engine(credential_name=env.db.default_credential) + + @task(task_id='insert_metadata_tables') + def metadata_tables(): + for disease in DISEASES: + try: + metadata_df = SINAN.metadata_df(disease) + pd.DataFrame.to_sql( + metadata_df, + f'sinan_{normalize_str(disease)}_metadata', + con=engine, + schema='brasil', + if_exists='replace', + ) + + logger.info(f'Metadata table for {disease} updated.') + except Exception: + print(f'No metadata available for {disease}') + + meta = metadata_tables() + meta + + +dag = metadata_tables() diff --git a/containers/airflow/dags/colombia_dag.py b/containers/airflow/dags/colombia_dag.py index b7b95be6..70660263 100644 --- a/containers/airflow/dags/colombia_dag.py +++ b/containers/airflow/dags/colombia_dag.py @@ -23,7 +23,7 @@ @dag( - schedule_interval="@daily", + schedule="@daily", default_args=default_args, catchup=False, ) diff --git a/containers/airflow/dags/covidch_dash_for_dag.py b/containers/airflow/dags/covidch_dash_for_dag.py index 7d70eabb..d45014db 100644 --- a/containers/airflow/dags/covidch_dash_for_dag.py +++ b/containers/airflow/dags/covidch_dash_for_dag.py @@ -43,7 +43,7 @@ @dag( - schedule_interval="@weekly", + schedule="@weekly", default_args=default_args, catchup=False, ) @@ -164,4 +164,4 @@ def up_for_total_icu(): start >> up_for_new_hosp() >> up_for_total_hosp() >> up_for_total_icu() >> end -dag = update_covidch() \ No newline at end of file +dag = update_covidch() diff --git a/containers/airflow/dags/covidch_dash_train_dag.py b/containers/airflow/dags/covidch_dash_train_dag.py index f181858b..75518471 100644 --- a/containers/airflow/dags/covidch_dash_train_dag.py +++ b/containers/airflow/dags/covidch_dash_train_dag.py @@ -45,7 +45,7 @@ @dag( - schedule_interval=timedelta(days =60), + schedule=timedelta(days =60), default_args=default_args, catchup=False, ) @@ -157,4 +157,4 @@ def train(canton, start >> train_new_hosp >> train_total_hosp >> train_total_icu >> end -dag = train_covidch() \ No newline at end of file +dag = train_covidch() diff --git a/containers/airflow/dags/foph_dag.py b/containers/airflow/dags/foph_dag.py index 003cad6d..6ab8f1e3 100644 --- a/containers/airflow/dags/foph_dag.py +++ b/containers/airflow/dags/foph_dag.py @@ -95,7 +95,7 @@ @dag( - schedule_interval="@weekly", + schedule="@weekly", default_args=default_args, catchup=False, tags = ['CHE', 'FOPH', 'Switzerland'] diff --git a/containers/airflow/dags/owid_dag.py b/containers/airflow/dags/owid_dag.py index 77701984..a732e5c8 100644 --- a/containers/airflow/dags/owid_dag.py +++ b/containers/airflow/dags/owid_dag.py @@ -79,7 +79,7 @@ @dag( - schedule_interval="@daily", + schedule="@daily", default_args=default_args, catchup=False, tags=['OWID'] diff --git a/containers/airflow/dags/tests/webservices_status_dag.py b/containers/airflow/dags/tests/webservices_status_dag.py index f7fb3c1c..08a957a9 100644 --- a/containers/airflow/dags/tests/webservices_status_dag.py +++ b/containers/airflow/dags/tests/webservices_status_dag.py @@ -23,7 +23,7 @@ @dag( - schedule_interval="@daily", + schedule="@daily", default_args=default_args, catchup=False, ) diff --git a/containers/airflow/scripts/startup.sh b/containers/airflow/scripts/startup.sh index b4daa6dd..3602a3e4 100755 --- a/containers/airflow/scripts/startup.sh +++ b/containers/airflow/scripts/startup.sh @@ -20,6 +20,9 @@ echo "========= airflow scheduler =========" airflow scheduler & sleep 10 +airflow dags reserialize +sleep 5 + # just to keep the prompt blocked mkdir -p /tmp/empty cd /tmp/empty