diff --git a/backend/tno/db.py b/backend/tno/db.py index 2f1ed0a0..13f42335 100644 --- a/backend/tno/db.py +++ b/backend/tno/db.py @@ -28,7 +28,7 @@ class DBBase: - def __init__(self, db_name: str = "default", pool=False): + def __init__(self, pool=False, db_name: str = "default"): self.db_name = db_name diff --git a/backend/tno/fixtures/initial_data.yaml b/backend/tno/fixtures/initial_data.yaml index af0db435..894d08da 100644 --- a/backend/tno/fixtures/initial_data.yaml +++ b/backend/tno/fixtures/initial_data.yaml @@ -33,8 +33,8 @@ name: gaia_dr3 display_name: GAIA DR3 database: prod_gavo - schema: gaia - tablename: dr3 + schema: gaia_dr3 + tablename: source ra_property: ra dec_property: dec registration_date: 2024-04-08 12:00:00.554084+00:00 diff --git a/backend/tno/migrations/0007_occultation_updated_at.py b/backend/tno/migrations/0007_occultation_updated_at.py new file mode 100644 index 00000000..81bb4d77 --- /dev/null +++ b/backend/tno/migrations/0007_occultation_updated_at.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.18 on 2024-08-29 19:25 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("tno", "0006_auto_20240822_1847"), + ] + + operations = [ + migrations.AddField( + model_name="occultation", + name="updated_at", + field=models.DateTimeField(auto_now=True, verbose_name="Updated at"), + ), + ] diff --git a/backend/tno/models/occultation.py b/backend/tno/models/occultation.py index 4fdff12f..4201672a 100644 --- a/backend/tno/models/occultation.py +++ b/backend/tno/models/occultation.py @@ -893,6 +893,9 @@ class Occultation(models.Model): # este campo sempre representa o momento da ultima atualização deste evento de # ocultação. created_at = models.DateTimeField(verbose_name="Created at", auto_now_add=True) + # Data de atualização do registro, + # Após a criação do hash_id, os registro podem ser atualizadas. + updated_at = models.DateTimeField(verbose_name="Updated at", auto_now=True) job_id = models.IntegerField( verbose_name="Prediction Job", diff --git a/predict_occultation/src/asteroid/asteroid.py b/predict_occultation/src/asteroid/asteroid.py index 4c17584b..11d7ee61 100644 --- a/predict_occultation/src/asteroid/asteroid.py +++ b/predict_occultation/src/asteroid/asteroid.py @@ -844,9 +844,6 @@ def register_occultations(self, start_period: str, end_period: str, jobid: int): try: dao = OccultationDao(log=log) - # Apaga as occultations já registradas para este asteroid antes de inserir. - # IMPORTANTE! apaga mesmo que não tenham sido gerados resultados. - dao.delete_by_asteroid_name_period(self.name, start_period, end_period) if "filename" not in self.predict_occultation: log.warning("There is no file with the predictions.") @@ -862,7 +859,6 @@ def register_occultations(self, start_period: str, end_period: str, jobid: int): return 0 # Le o arquivo occultation table e cria um dataframe - # occultation_date;ra_star_candidate;dec_star_candidate;ra_object;dec_object;ca;pa;vel;delta;g;j;h;k;long;loc_t;off_ra;off_de;pm;ct;f;e_ra;e_de;pmra;pmde df = pd.read_csv( predict_table_path, delimiter=";", @@ -988,20 +984,9 @@ def register_occultations(self, start_period: str, end_period: str, jobid: int): # ATENCAO! Sobrescreve o arquivo occultation_table.csv df.to_csv(predict_table_path, index=False, sep=";") - data = StringIO() - df.to_csv( - data, - sep="|", - header=True, - index=False, - ) - data.seek(0) - - # rowcount = dao.import_occultations(list(df.columns), data) - rowcount = dao.import_occultations(data) + rowcount = dao.upinsert_occultations(df) del df - del data del dao self.ingest_occultations.update({"count": rowcount}) diff --git a/predict_occultation/src/dao/occultation.py b/predict_occultation/src/dao/occultation.py index d00a8fc0..86252933 100644 --- a/predict_occultation/src/dao/occultation.py +++ b/predict_occultation/src/dao/occultation.py @@ -1,7 +1,24 @@ +# from sqlalchemy.dialects import postgresql +import pandas as pd from dao.db_base import DBBase +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.sql import and_, delete +def occultations_upsert(table, conn, keys, data_iter): + + data = [dict(zip(keys, row)) for row in data_iter] + + insert_statement = insert(table.table).values(data) + upsert_statement = insert_statement.on_conflict_do_update( + constraint=f"tno_occultation_hash_id_key", + set_={c.key: c for c in insert_statement.excluded}, + ) + # print(upsert_statement.compile(dialect=postgresql.dialect())) + result = conn.execute(upsert_statement) + return result.rowcount + + class OccultationDao(DBBase): def __init__(self, log): super(OccultationDao, self).__init__() @@ -58,3 +75,20 @@ def import_occultations(self, data): rowcount = self.import_with_copy_expert(sql, data) return rowcount + + def upinsert_occultations(self, df): + + # Add updated_at column + df["updated_at"] = pd.to_datetime("now", utc=True) + + engine = self.get_db_engine() + with engine.connect() as conn: + rowcount = df.to_sql( + "tno_occultation", + con=conn, + if_exists="append", + method=occultations_upsert, + index=False, + ) + + return rowcount diff --git a/predict_occultation/src/teste.py b/predict_occultation/src/teste.py index d4527baa..71573267 100644 --- a/predict_occultation/src/teste.py +++ b/predict_occultation/src/teste.py @@ -6,13 +6,15 @@ from pathlib import Path import colorlog +import pandas as pd from asteroid import Asteroid +from dao.occultation import OccultationDao -log = logging.getLogger("teste") -log.setLevel(logging.DEBUG) -consoleFormatter = logging.Formatter("[%(levelname)s] %(message)s") -consoleHandler = logging.StreamHandler(sys.stdout) -log.addHandler(consoleHandler) +# log = logging.getLogger("teste") +# log.setLevel(logging.DEBUG) +# consoleFormatter = logging.Formatter("[%(levelname)s] %(message)s") +# consoleHandler = logging.StreamHandler(sys.stdout) +# log.addHandler(consoleHandler) log = colorlog.getLogger("teste") @@ -24,14 +26,49 @@ log.info("------- Asteroid Class -------") -base_path = Path("/app/outputs/predict_occultations/5/asteroids/2006BK86") - -from predict_occultation.pipeline.occ_path_coeff import run_occultation_path_coeff +base_path = Path("/app/outputs/predict_occultations/2/asteroids/Chiron") pred_table = base_path.joinpath("occultation_table.csv") -obj_dict = json.load(base_path.joinpath("2006BK86.json").open()) -mag_and_uncert = base_path.joinpath("apmag_and_uncertainties.json") -run_occultation_path_coeff(pred_table, obj_dict, mag_and_uncert) +print(pred_table) +df = pd.read_csv( + pred_table, + delimiter=";", +) + +# def postgres_upsert(table, conn, keys, data_iter): +# from sqlalchemy.dialects.postgresql import insert +# from sqlalchemy.dialects import postgresql + +# data = [dict(zip(keys, row)) for row in data_iter] +# print(data) + +# insert_statement = insert(table.table).values(data) +# upsert_statement = insert_statement.on_conflict_do_update( +# constraint=f"tno_occultation_hash_id_key", +# set_={c.key: c for c in insert_statement.excluded}, +# ) +# # print(upsert_statement.compile(dialect=postgresql.dialect())) +# conn.execute(upsert_statement) + + +print(df.head(5)) +dao = OccultationDao(log=log) +engine = dao.get_db_engine() +rowcount = dao.upinsert_occultations(df) +print(rowcount) +# with engine.connect() as conn: +# df.to_sql( +# "tno_occultation", +# con=conn, +# if_exists="append", +# method=postgres_upsert, +# index=False, +# ) + + +# obj_dict = json.load(base_path.joinpath("2006BK86.json").open()) +# mag_and_uncert = base_path.joinpath("apmag_and_uncertainties.json") +# run_occultation_path_coeff(pred_table, obj_dict, mag_and_uncert) # try: # a = Asteroid( @@ -124,3 +161,16 @@ # # md5 = hashlib.md5(a.encode('utf-8')).digest() # # hash = base64.urlsafe_b64encode(md5).decode('utf-8').rstrip("=") # # print(hash) + + +# def postgres_upsert(table, conn, keys, data_iter): +# from sqlalchemy.dialects.postgresql import insert + +# data = [dict(zip(keys, row)) for row in data_iter] + +# insert_statement = insert(table.table).values(data) +# upsert_statement = insert_statement.on_conflict_do_update( +# constraint=f"{table.table.name}_pkey", +# set_={c.key: c for c in insert_statement.excluded}, +# ) +# conn.execute(upsert_statement)