Skip to content

Commit

Permalink
Fix import occ (#840)
Browse files Browse the repository at this point in the history
* Changes parsl config for running in cluster

* fixed run in cluster
  • Loading branch information
glaubervila authored Feb 16, 2024
1 parent c44fd72 commit b820608
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 256 deletions.
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"mrmlnc.vscode-duplicate",
"visualstudioexptteam.vscodeintellicode",
"visualstudioexptteam.intellicode-api-usage-examples",
"eamodio.gitlens",
// python
"ms-python.python",
"ms-python.vscode-pylance",
Expand Down
8 changes: 6 additions & 2 deletions predict_occultation/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ RUN groupadd --gid ${USERGID} ${USERNAME} \
ENV PIPELINE_ROOT=${APP_HOME}/src
ENV PIPELINE_PREDIC_OCC=${PIPELINE_ROOT}/predict_occultation
ENV PIPELINE_PATH=${PIPELINE_PREDIC_OCC}/pipeline
ENV WORKFLOW_PATH=${PIPELINE_ROOT}
ENV EXECUTION_PATH=${PIPELINE_ROOT}
ENV PREDICT_OUTPUTS=${APP_HOME}/outputs/predict_occultation

# quando parsl_env = local o diretório remoto é o mesmo que o local.
# mas quando parsl_env = linea o diretório remoto é por ex: /lustre/t1/scratch/users/app.tno/tno_testing
ENV REMOTE_PIPELINE_ROOT=$PIPELINE_ROOT


COPY --chown=${USERNAME}:ton --chmod=775 src /app/src
COPY --chown=${USERNAME}:ton --chmod=775 entrypoint.sh ${APP_HOME}
Expand Down
3 changes: 2 additions & 1 deletion predict_occultation/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/bin/bash --login
umask ug=rwx,o=r

echo "Running Rsync: ${PIPELINE_PREDIC_OCC}"
rsync -r /app/src/predict_occultation/ ${PIPELINE_PREDIC_OCC}/
rsync -r /app/src/predict_occultation/ ${REMOTE_PIPELINE_ROOT}/predict_occultation


# Baixa os arquivos bsp planetary e leap_second caso não existam.
Expand Down
27 changes: 11 additions & 16 deletions predict_occultation/src/asteroid/asteroid.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@
import pathlib
from datetime import datetime as dt
from datetime import timezone

# import numpy as np
from io import StringIO

# from occviz import occultation_path_coeff
from typing import Optional

# import csv
import pandas as pd
from asteroid.external_inputs import AsteroidExternalInputs
from asteroid.jpl import findSPKID, get_bsp_from_jpl
from dao import AsteroidDao, ObservationDao, OccultationDao
from library import dec2DMS # ra_hms_to_deg,; dec_hms_to_deg,
from library import date_to_jd, has_expired, ra2HMS
from library import date_to_jd, dec2DMS, has_expired, ra2HMS


def serialize(obj):
Expand Down Expand Up @@ -174,20 +168,21 @@ def get_log(self):

def get_base_path(self):
base_path = self.__BASE_PATH
if not base_path:
# Carrega as variaveis de configuração do arquivo config.ini
config = configparser.ConfigParser()
config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini"))
base_path = config["DEFAULT"].get("AsteroidPath")
self.__BASE_PATH = base_path
# if not base_path:
# # Carrega as variaveis de configuração do arquivo config.ini
# config = configparser.ConfigParser()
# config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini"))
# base_path = config["DEFAULT"].get("AsteroidPath")
# self.__BASE_PATH = base_path

return base_path

def get_jpl_email(self):
# Carrega as variaveis de configuração do arquivo config.ini
config = configparser.ConfigParser()
config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini"))
JPL_EMAIL = config["DEFAULT"].get("JplEmail", "sso-portal@linea.gov.br")
# config = configparser.ConfigParser()
# config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini"))
# JPL_EMAIL = config["DEFAULT"].get("JplEmail", "sso-portal@linea.gov.br")
JPL_EMAIL = "sso-portal@linea.gov.br"
return JPL_EMAIL

def get_or_create_dir(self):
Expand Down
5 changes: 0 additions & 5 deletions predict_occultation/src/dao/db_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def get_db_uri(self):
# DB_USER, DB_PASS, DB_HOST, DB_PORT, DB_NAME
# db_uri = "postgresql+psycopg2://%s:%s@%s:%s/%s" % (
# "postgres", "postgres", "172.18.0.2", "5432", "tno_v2")

# DB_URI=postgresql+psycopg2://USER:PASS@HOST:PORT/DB_NAME
try:
db_uri = os.environ["DB_URI_ADMIN"]
Expand All @@ -34,10 +33,6 @@ def get_db_uri(self):
)

def get_db_engine(self):
# Carrega as variaveis de configuração do arquivo config.ini
config = configparser.ConfigParser()
config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini"))

engine = create_engine(self.get_db_uri(), poolclass=NullPool)

return engine
Expand Down
134 changes: 0 additions & 134 deletions predict_occultation/src/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,140 +59,6 @@ def count_lines(filepath):
return num_lines


def ingest_occultations(asteroid_id, name, number, filepath, start_period, end_period):

from io import StringIO

import pandas as pd
from dao import OccultationDao
from library import dec_hms_to_deg, ra_hms_to_deg

dao = OccultationDao()

# Apaga as occultations já registradas para este asteroid antes de inserir.
dao.delete_by_asteroid_id(asteroid_id, start_period, end_period)

# Le o arquivo occultation table e cria um dataframe
# occultation_date;ra_star_candidate;dec_star_candidate;ra_object;dec_object;ca;pa;vel;delta;g;j;h;k;long;loc_t;off_ra;off_de;pm;ct;f;e_ra;e_de;pmra;pmde
df = pd.read_csv(
filepath,
delimiter=";",
header=None,
skiprows=1,
names=[
"occultation_date",
"ra_star_candidate",
"dec_star_candidate",
"ra_object",
"dec_object",
"ca",
"pa",
"vel",
"delta",
"g",
"j",
"h",
"k",
"long",
"loc_t",
"off_ra",
"off_de",
"pm",
"ct",
"f",
"e_ra",
"e_de",
"pmra",
"pmde",
],
)

# Adiciona as colunas de coordenadas de target e star convertidas para degrees.
df["ra_target_deg"] = df["ra_object"].apply(ra_hms_to_deg)
df["dec_target_deg"] = df["dec_object"].apply(dec_hms_to_deg)
df["ra_star_deg"] = df["ra_star_candidate"].apply(ra_hms_to_deg)
df["dec_star_deg"] = df["dec_star_candidate"].apply(dec_hms_to_deg)

# Adicionar colunas para asteroid id, name e number
df["name"] = name
df["number"] = number
df["asteroid_id"] = asteroid_id

# Remover valores como -- ou -
df["ct"] = df["ct"].str.replace("--", "")
df["f"] = df["f"].str.replace("-", "")

# Altera o nome das colunas
df = df.rename(
columns={
"occultation_date": "date_time",
"ra_object": "ra_target",
"dec_object": "dec_target",
"ca": "closest_approach",
"pa": "position_angle",
"vel": "velocity",
"off_de": "off_dec",
"pm": "proper_motion",
"f": "multiplicity_flag",
"e_de": "e_dec",
"pmde": "pmdec",
}
)

# Altera a ordem das colunas para coincidir com a da tabela
df = df.reindex(
columns=[
"name",
"number",
"date_time",
"ra_star_candidate",
"dec_star_candidate",
"ra_target",
"dec_target",
"closest_approach",
"position_angle",
"velocity",
"delta",
"g",
"j",
"h",
"k",
"long",
"loc_t",
"off_ra",
"off_dec",
"proper_motion",
"ct",
"multiplicity_flag",
"e_ra",
"e_dec",
"pmra",
"pmdec",
"ra_star_deg",
"dec_star_deg",
"ra_target_deg",
"dec_target_deg",
"asteroid_id",
]
)

data = StringIO()
df.to_csv(
data,
sep="|",
header=True,
index=False,
)
data.seek(0)

rowcount = dao.import_occultations(data)

del df
del data

return rowcount


def has_expired(date, days=60):
from datetime import datetime

Expand Down
125 changes: 125 additions & 0 deletions predict_occultation/src/parsl_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os
from pathlib import Path

from parsl.channels import SSHChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SrunLauncher
from parsl.providers import LocalProvider, SlurmProvider


def get_config(key, jobpath):
"""
Creates an instance of the Parsl configuration
Args:
key (str): The key of the configuration to be returned.
Options are: 'local' or 'linea'.
Returns:
config: Parsl config instance.
"""

parsl_env = os.getenv("PARSL_ENV", "local")

if parsl_env == "linea":
# Check Linea PATH envs
pipeline_root = os.getenv("REMOTE_PIPELINE_ROOT", None)
if not pipeline_root:
raise Exception(
"It is necessary to configure the REMOTE_PIPELINE_ROOT variable when PARSL_ENV=linea"
)

pipeline_root = Path(pipeline_root)
condapath = pipeline_root.joinpath("miniconda/bin")
pipeline_pred_occ = pipeline_root.joinpath("predict_occultation")
pipeline_path = pipeline_root.joinpath("predict_occultation/pipeline")

# Diretório de Outputs
predict_outputs = os.getenv("PREDICT_OUTPUTS", None)
if not predict_outputs:
raise Exception(
"It is necessary to configure the PREDICT_OUTPUTS variable when PARSL_ENV=linea"
)
predict_outputs = Path(predict_outputs)
predict_outputs.mkdir(parents=True, exist_ok=True)

# Linea SSH user keys
# sshkey = os.getenv("SSHKEY", "~/.ssh/id_rsa")
sshkey = os.getenv("SSHKEY", None)
if not sshkey:
raise Exception(
"It is necessary to configure the SSHKEY variable when PARSL_ENV=linea"
)

# Linea DB prod_gavo DB uri. Catalog DB.
db_uri = os.getenv("DB_URI", None)
if not db_uri:
raise Exception(
"It is necessary to configure the DB_URI variable when PARSL_ENV=linea"
)

# Env.sh que sera executado antes de iniciar as tasks no cluster
cluster_env_sh = pipeline_pred_occ.joinpath("cluster.sh")

# Script DIR dentro do diretorio do job
script_dir = pipeline_root.joinpath("script_dir")
script_dir.mkdir(parents=True, exist_ok=True)

executors = {
"linea": HighThroughputExecutor(
label="linea",
worker_logdir_root=str(script_dir),
provider=SlurmProvider(
partition="cpu",
nodes_per_block=2, # number of nodes
cmd_timeout=240, # duration for which the provider will wait for a command to be invoked on a remote system
launcher=SrunLauncher(debug=True, overrides=""),
init_blocks=2,
min_blocks=3,
max_blocks=12,
parallelism=1,
walltime="15:00:00",
worker_init=f"source {cluster_env_sh}\n",
channel=SSHChannel(
hostname="loginapl01",
username="app.tno",
key_filename=sshkey,
script_dir=str(script_dir),
envs={
"PARSL_ENV": "linea",
"CONDAPATH": str(condapath),
"PIPELINE_PREDIC_OCC": str(pipeline_pred_occ),
"PIPELINE_PATH": str(pipeline_path),
"PYTHONPATH": ":".join(
[
str(pipeline_root),
str(pipeline_pred_occ),
str(pipeline_path),
]
),
"PREDICT_OUTPUTS": str(predict_outputs),
"DB_URI": db_uri,
},
),
),
)
}

if parsl_env == "local":
executors = {
"local": HighThroughputExecutor(
label="local",
worker_debug=False,
max_workers=4,
provider=LocalProvider(
min_blocks=1,
init_blocks=1,
max_blocks=1,
parallelism=1,
worker_init=f"source /app/src/env.sh\n",
),
),
}

return Config(executors=[executors[key]], strategy=None)
19 changes: 19 additions & 0 deletions predict_occultation/src/predict_occultation/cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
# export CONDAPATH=$SLURM_REMOTE_DIR/miniconda/bin
#export PIPELINE_PREDIC_OCC=/lustre/t1/scratch/users/app.tno/tno_testing/predict_occultation
#export PIPELINE_PATH=/lustre/t1/scratch/users/app.tno/tno_testing/predict_occultation/pipeline
#export PYTHONPATH=$PYTHONPATH:$PIPELINE_PATH:$PIPELINE_PREDIC_OCC

source $CONDAPATH/activate
conda activate py3

# export PIPELINE_PREDIC_OCC=$SLURM_REMOTE_DIR/predict_occultation
# export PIPELINE_PATH=$SLURM_REMOTE_DIR/predict_occultation/pipeline
# export PYTHONPATH=$SLURM_REMOTE_DIR:$PIPELINE_PREDIC_OCC:$PIPELINE_PATH:$PYTHONPATH

# export DB_URI=postgresql+psycopg2://untrustedprod:untrusted@desdb4.linea.gov.br:5432/prod_gavo
# export PARSL_ENV=linea

ulimit -s 100000
ulimit -u 100000
umask 0002
Loading

0 comments on commit b820608

Please sign in to comment.