Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert/Update predict events #1059

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/tno/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

class DBBase:

def __init__(self, db_name: str = "default", pool=False):
def __init__(self, pool=False, db_name: str = "default"):

self.db_name = db_name

Expand Down
4 changes: 2 additions & 2 deletions backend/tno/fixtures/initial_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
name: gaia_dr3
display_name: GAIA DR3
database: prod_gavo
schema: gaia
tablename: dr3
schema: gaia_dr3
tablename: source
ra_property: ra
dec_property: dec
registration_date: 2024-04-08 12:00:00.554084+00:00
18 changes: 18 additions & 0 deletions backend/tno/migrations/0007_occultation_updated_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.18 on 2024-08-29 19:25

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("tno", "0006_auto_20240822_1847"),
]

operations = [
migrations.AddField(
model_name="occultation",
name="updated_at",
field=models.DateTimeField(auto_now=True, verbose_name="Updated at"),
),
]
3 changes: 3 additions & 0 deletions backend/tno/models/occultation.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,9 @@ class Occultation(models.Model):
# este campo sempre representa o momento da ultima atualização deste evento de
# ocultação.
created_at = models.DateTimeField(verbose_name="Created at", auto_now_add=True)
# Data de atualização do registro,
# Após a criação do hash_id, os registro podem ser atualizadas.
updated_at = models.DateTimeField(verbose_name="Updated at", auto_now=True)

job_id = models.IntegerField(
verbose_name="Prediction Job",
Expand Down
17 changes: 1 addition & 16 deletions predict_occultation/src/asteroid/asteroid.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,9 +844,6 @@ def register_occultations(self, start_period: str, end_period: str, jobid: int):

try:
dao = OccultationDao(log=log)
# Apaga as occultations já registradas para este asteroid antes de inserir.
# IMPORTANTE! apaga mesmo que não tenham sido gerados resultados.
dao.delete_by_asteroid_name_period(self.name, start_period, end_period)

if "filename" not in self.predict_occultation:
log.warning("There is no file with the predictions.")
Expand All @@ -862,7 +859,6 @@ def register_occultations(self, start_period: str, end_period: str, jobid: int):
return 0

# Le o arquivo occultation table e cria um dataframe
# occultation_date;ra_star_candidate;dec_star_candidate;ra_object;dec_object;ca;pa;vel;delta;g;j;h;k;long;loc_t;off_ra;off_de;pm;ct;f;e_ra;e_de;pmra;pmde
df = pd.read_csv(
predict_table_path,
delimiter=";",
Expand Down Expand Up @@ -988,20 +984,9 @@ def register_occultations(self, start_period: str, end_period: str, jobid: int):
# ATENCAO! Sobrescreve o arquivo occultation_table.csv
df.to_csv(predict_table_path, index=False, sep=";")

data = StringIO()
df.to_csv(
data,
sep="|",
header=True,
index=False,
)
data.seek(0)

# rowcount = dao.import_occultations(list(df.columns), data)
rowcount = dao.import_occultations(data)
rowcount = dao.upinsert_occultations(df)

del df
del data
del dao

self.ingest_occultations.update({"count": rowcount})
Expand Down
34 changes: 34 additions & 0 deletions predict_occultation/src/dao/occultation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
# from sqlalchemy.dialects import postgresql
import pandas as pd
from dao.db_base import DBBase
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql import and_, delete


def occultations_upsert(table, conn, keys, data_iter):

data = [dict(zip(keys, row)) for row in data_iter]

insert_statement = insert(table.table).values(data)
upsert_statement = insert_statement.on_conflict_do_update(
constraint=f"tno_occultation_hash_id_key",
set_={c.key: c for c in insert_statement.excluded},
)
# print(upsert_statement.compile(dialect=postgresql.dialect()))
result = conn.execute(upsert_statement)
return result.rowcount


class OccultationDao(DBBase):
def __init__(self, log):
super(OccultationDao, self).__init__()
Expand Down Expand Up @@ -58,3 +75,20 @@ def import_occultations(self, data):
rowcount = self.import_with_copy_expert(sql, data)

return rowcount

def upinsert_occultations(self, df):

# Add updated_at column
df["updated_at"] = pd.to_datetime("now", utc=True)

engine = self.get_db_engine()
with engine.connect() as conn:
rowcount = df.to_sql(
"tno_occultation",
con=conn,
if_exists="append",
method=occultations_upsert,
index=False,
)

return rowcount
72 changes: 61 additions & 11 deletions predict_occultation/src/teste.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from pathlib import Path

import colorlog
import pandas as pd
from asteroid import Asteroid
from dao.occultation import OccultationDao

log = logging.getLogger("teste")
log.setLevel(logging.DEBUG)
consoleFormatter = logging.Formatter("[%(levelname)s] %(message)s")
consoleHandler = logging.StreamHandler(sys.stdout)
log.addHandler(consoleHandler)
# log = logging.getLogger("teste")
# log.setLevel(logging.DEBUG)
# consoleFormatter = logging.Formatter("[%(levelname)s] %(message)s")
# consoleHandler = logging.StreamHandler(sys.stdout)
# log.addHandler(consoleHandler)


log = colorlog.getLogger("teste")
Expand All @@ -24,14 +26,49 @@

log.info("------- Asteroid Class -------")

base_path = Path("/app/outputs/predict_occultations/5/asteroids/2006BK86")

from predict_occultation.pipeline.occ_path_coeff import run_occultation_path_coeff
base_path = Path("/app/outputs/predict_occultations/2/asteroids/Chiron")

pred_table = base_path.joinpath("occultation_table.csv")
obj_dict = json.load(base_path.joinpath("2006BK86.json").open())
mag_and_uncert = base_path.joinpath("apmag_and_uncertainties.json")
run_occultation_path_coeff(pred_table, obj_dict, mag_and_uncert)
print(pred_table)
df = pd.read_csv(
pred_table,
delimiter=";",
)

# def postgres_upsert(table, conn, keys, data_iter):
# from sqlalchemy.dialects.postgresql import insert
# from sqlalchemy.dialects import postgresql

# data = [dict(zip(keys, row)) for row in data_iter]
# print(data)

# insert_statement = insert(table.table).values(data)
# upsert_statement = insert_statement.on_conflict_do_update(
# constraint=f"tno_occultation_hash_id_key",
# set_={c.key: c for c in insert_statement.excluded},
# )
# # print(upsert_statement.compile(dialect=postgresql.dialect()))
# conn.execute(upsert_statement)


print(df.head(5))
dao = OccultationDao(log=log)
engine = dao.get_db_engine()
rowcount = dao.upinsert_occultations(df)
print(rowcount)
# with engine.connect() as conn:
# df.to_sql(
# "tno_occultation",
# con=conn,
# if_exists="append",
# method=postgres_upsert,
# index=False,
# )


# obj_dict = json.load(base_path.joinpath("2006BK86.json").open())
# mag_and_uncert = base_path.joinpath("apmag_and_uncertainties.json")
# run_occultation_path_coeff(pred_table, obj_dict, mag_and_uncert)

# try:
# a = Asteroid(
Expand Down Expand Up @@ -124,3 +161,16 @@
# # md5 = hashlib.md5(a.encode('utf-8')).digest()
# # hash = base64.urlsafe_b64encode(md5).decode('utf-8').rstrip("=")
# # print(hash)


# def postgres_upsert(table, conn, keys, data_iter):
# from sqlalchemy.dialects.postgresql import insert

# data = [dict(zip(keys, row)) for row in data_iter]

# insert_statement = insert(table.table).values(data)
# upsert_statement = insert_statement.on_conflict_do_update(
# constraint=f"{table.table.name}_pkey",
# set_={c.key: c for c in insert_statement.excluded},
# )
# conn.execute(upsert_statement)
Loading