From acf5e751981fd624eb37a2eb76d431de008021d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Mon, 27 Mar 2023 10:54:37 -0300 Subject: [PATCH] Minor fixes --- containers/airflow/dags/brasil/sinan.py | 26 ++++++++++++------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index 09486f08..314db7c0 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -120,7 +120,7 @@ def dbcs_to_fetch() -> dict: ) years = cur.all() except Exception as e: - if "UndefinedColumn" in str(e): + if "UndefinedColumn" or "NoSuchTableError" in str(e): years = [] db_years.extend(list(chain(*years))) # Compare years found in ctl table with FTP server @@ -191,10 +191,10 @@ def upload_not_inserted(**kwargs) -> dict: def insert_parquerts(stage): parquets = finals or [] if stage == 'finals' else prelims or [] prelim = False if stage == 'finals' else True - + print(parquets) for parquet in parquets: year = get_year(parquet) - df = viz.parquet(parquet) + df = viz.parquet(str(parquet)) if df.empty: raise ValueError('DataFrame is empty') @@ -210,19 +210,17 @@ def insert_parquerts(stage): sql_dtypes = { 'int64': Integer, 'float64': Float, + 'string': String, 'object': String, 'datetime64[ns]': DateTime, } - metadata = MetaData() - metadata.reflect(bind=engine) - table = Table( - f'{schema}.{tablename}', - metadata, - autoload=True, - autoload_with=engine - ) - - tcolumns = [column.name for column in table.columns] + + with engine.connect() as conn: + cur = conn.execute( + f'SELECT * FROM {schema}.{tablename} LIMIT 0' + ) + tcolumns = cur.keys() + newcols = [c for c in df.columns if c not in tcolumns] insert_cols_query = f'ALTER TABLE {schema}.{tablename}' @@ -250,7 +248,7 @@ def insert_parquerts(stage): f' WHERE year = {year}' ) inserted_rows[year] = cur.fetchone()[0] - + if finals: insert_parquerts('finals') if prelims: