From b820608636d517a65e5fc2d54ca6f8f6847c9146 Mon Sep 17 00:00:00 2001 From: Glauber Costa Vila-Verde Date: Fri, 16 Feb 2024 19:01:19 -0300 Subject: [PATCH] Fix import occ (#840) * Changes parsl config for running in cluster * fixed run in cluster --- .devcontainer/devcontainer.json | 1 + predict_occultation/Dockerfile | 8 +- predict_occultation/entrypoint.sh | 3 +- predict_occultation/src/asteroid/asteroid.py | 27 ++-- predict_occultation/src/dao/db_base.py | 5 - predict_occultation/src/library.py | 134 ------------------ predict_occultation/src/parsl_config.py | 125 ++++++++++++++++ .../src/predict_occultation/cluster.sh | 19 +++ .../src/predict_occultation/parsl_config.py | 63 -------- .../src/predict_occultation/pipeline/run.sh | 9 ++ predict_occultation/src/run_pred_occ.py | 66 ++++----- 11 files changed, 204 insertions(+), 256 deletions(-) create mode 100755 predict_occultation/src/parsl_config.py create mode 100755 predict_occultation/src/predict_occultation/cluster.sh delete mode 100755 predict_occultation/src/predict_occultation/parsl_config.py diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 1006be7e..5e3a8f2d 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -75,6 +75,7 @@ "mrmlnc.vscode-duplicate", "visualstudioexptteam.vscodeintellicode", "visualstudioexptteam.intellicode-api-usage-examples", + "eamodio.gitlens", // python "ms-python.python", "ms-python.vscode-pylance", diff --git a/predict_occultation/Dockerfile b/predict_occultation/Dockerfile index 45ddce48..9120e827 100644 --- a/predict_occultation/Dockerfile +++ b/predict_occultation/Dockerfile @@ -123,8 +123,12 @@ RUN groupadd --gid ${USERGID} ${USERNAME} \ ENV PIPELINE_ROOT=${APP_HOME}/src ENV PIPELINE_PREDIC_OCC=${PIPELINE_ROOT}/predict_occultation ENV PIPELINE_PATH=${PIPELINE_PREDIC_OCC}/pipeline -ENV WORKFLOW_PATH=${PIPELINE_ROOT} -ENV EXECUTION_PATH=${PIPELINE_ROOT} +ENV PREDICT_OUTPUTS=${APP_HOME}/outputs/predict_occultation + +# quando parsl_env = local o diretório remoto é o mesmo que o local. +# mas quando parsl_env = linea o diretório remoto é por ex: /lustre/t1/scratch/users/app.tno/tno_testing +ENV REMOTE_PIPELINE_ROOT=$PIPELINE_ROOT + COPY --chown=${USERNAME}:ton --chmod=775 src /app/src COPY --chown=${USERNAME}:ton --chmod=775 entrypoint.sh ${APP_HOME} diff --git a/predict_occultation/entrypoint.sh b/predict_occultation/entrypoint.sh index 29a64198..51d8289e 100755 --- a/predict_occultation/entrypoint.sh +++ b/predict_occultation/entrypoint.sh @@ -1,7 +1,8 @@ #!/bin/bash --login +umask ug=rwx,o=r echo "Running Rsync: ${PIPELINE_PREDIC_OCC}" -rsync -r /app/src/predict_occultation/ ${PIPELINE_PREDIC_OCC}/ +rsync -r /app/src/predict_occultation/ ${REMOTE_PIPELINE_ROOT}/predict_occultation # Baixa os arquivos bsp planetary e leap_second caso não existam. diff --git a/predict_occultation/src/asteroid/asteroid.py b/predict_occultation/src/asteroid/asteroid.py index b2cf2f18..e02b9a55 100644 --- a/predict_occultation/src/asteroid/asteroid.py +++ b/predict_occultation/src/asteroid/asteroid.py @@ -6,20 +6,14 @@ import pathlib from datetime import datetime as dt from datetime import timezone - -# import numpy as np from io import StringIO - -# from occviz import occultation_path_coeff from typing import Optional -# import csv import pandas as pd from asteroid.external_inputs import AsteroidExternalInputs from asteroid.jpl import findSPKID, get_bsp_from_jpl from dao import AsteroidDao, ObservationDao, OccultationDao -from library import dec2DMS # ra_hms_to_deg,; dec_hms_to_deg, -from library import date_to_jd, has_expired, ra2HMS +from library import date_to_jd, dec2DMS, has_expired, ra2HMS def serialize(obj): @@ -174,20 +168,21 @@ def get_log(self): def get_base_path(self): base_path = self.__BASE_PATH - if not base_path: - # Carrega as variaveis de configuração do arquivo config.ini - config = configparser.ConfigParser() - config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini")) - base_path = config["DEFAULT"].get("AsteroidPath") - self.__BASE_PATH = base_path + # if not base_path: + # # Carrega as variaveis de configuração do arquivo config.ini + # config = configparser.ConfigParser() + # config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini")) + # base_path = config["DEFAULT"].get("AsteroidPath") + # self.__BASE_PATH = base_path return base_path def get_jpl_email(self): # Carrega as variaveis de configuração do arquivo config.ini - config = configparser.ConfigParser() - config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini")) - JPL_EMAIL = config["DEFAULT"].get("JplEmail", "sso-portal@linea.gov.br") + # config = configparser.ConfigParser() + # config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini")) + # JPL_EMAIL = config["DEFAULT"].get("JplEmail", "sso-portal@linea.gov.br") + JPL_EMAIL = "sso-portal@linea.gov.br" return JPL_EMAIL def get_or_create_dir(self): diff --git a/predict_occultation/src/dao/db_base.py b/predict_occultation/src/dao/db_base.py index 4189adeb..c670b29b 100644 --- a/predict_occultation/src/dao/db_base.py +++ b/predict_occultation/src/dao/db_base.py @@ -22,7 +22,6 @@ def get_db_uri(self): # DB_USER, DB_PASS, DB_HOST, DB_PORT, DB_NAME # db_uri = "postgresql+psycopg2://%s:%s@%s:%s/%s" % ( # "postgres", "postgres", "172.18.0.2", "5432", "tno_v2") - # DB_URI=postgresql+psycopg2://USER:PASS@HOST:PORT/DB_NAME try: db_uri = os.environ["DB_URI_ADMIN"] @@ -34,10 +33,6 @@ def get_db_uri(self): ) def get_db_engine(self): - # Carrega as variaveis de configuração do arquivo config.ini - config = configparser.ConfigParser() - config.read(os.path.join(os.environ["EXECUTION_PATH"], "config.ini")) - engine = create_engine(self.get_db_uri(), poolclass=NullPool) return engine diff --git a/predict_occultation/src/library.py b/predict_occultation/src/library.py index 3730f77a..9563a351 100644 --- a/predict_occultation/src/library.py +++ b/predict_occultation/src/library.py @@ -59,140 +59,6 @@ def count_lines(filepath): return num_lines -def ingest_occultations(asteroid_id, name, number, filepath, start_period, end_period): - - from io import StringIO - - import pandas as pd - from dao import OccultationDao - from library import dec_hms_to_deg, ra_hms_to_deg - - dao = OccultationDao() - - # Apaga as occultations já registradas para este asteroid antes de inserir. - dao.delete_by_asteroid_id(asteroid_id, start_period, end_period) - - # Le o arquivo occultation table e cria um dataframe - # occultation_date;ra_star_candidate;dec_star_candidate;ra_object;dec_object;ca;pa;vel;delta;g;j;h;k;long;loc_t;off_ra;off_de;pm;ct;f;e_ra;e_de;pmra;pmde - df = pd.read_csv( - filepath, - delimiter=";", - header=None, - skiprows=1, - names=[ - "occultation_date", - "ra_star_candidate", - "dec_star_candidate", - "ra_object", - "dec_object", - "ca", - "pa", - "vel", - "delta", - "g", - "j", - "h", - "k", - "long", - "loc_t", - "off_ra", - "off_de", - "pm", - "ct", - "f", - "e_ra", - "e_de", - "pmra", - "pmde", - ], - ) - - # Adiciona as colunas de coordenadas de target e star convertidas para degrees. - df["ra_target_deg"] = df["ra_object"].apply(ra_hms_to_deg) - df["dec_target_deg"] = df["dec_object"].apply(dec_hms_to_deg) - df["ra_star_deg"] = df["ra_star_candidate"].apply(ra_hms_to_deg) - df["dec_star_deg"] = df["dec_star_candidate"].apply(dec_hms_to_deg) - - # Adicionar colunas para asteroid id, name e number - df["name"] = name - df["number"] = number - df["asteroid_id"] = asteroid_id - - # Remover valores como -- ou - - df["ct"] = df["ct"].str.replace("--", "") - df["f"] = df["f"].str.replace("-", "") - - # Altera o nome das colunas - df = df.rename( - columns={ - "occultation_date": "date_time", - "ra_object": "ra_target", - "dec_object": "dec_target", - "ca": "closest_approach", - "pa": "position_angle", - "vel": "velocity", - "off_de": "off_dec", - "pm": "proper_motion", - "f": "multiplicity_flag", - "e_de": "e_dec", - "pmde": "pmdec", - } - ) - - # Altera a ordem das colunas para coincidir com a da tabela - df = df.reindex( - columns=[ - "name", - "number", - "date_time", - "ra_star_candidate", - "dec_star_candidate", - "ra_target", - "dec_target", - "closest_approach", - "position_angle", - "velocity", - "delta", - "g", - "j", - "h", - "k", - "long", - "loc_t", - "off_ra", - "off_dec", - "proper_motion", - "ct", - "multiplicity_flag", - "e_ra", - "e_dec", - "pmra", - "pmdec", - "ra_star_deg", - "dec_star_deg", - "ra_target_deg", - "dec_target_deg", - "asteroid_id", - ] - ) - - data = StringIO() - df.to_csv( - data, - sep="|", - header=True, - index=False, - ) - data.seek(0) - - rowcount = dao.import_occultations(data) - - del df - del data - - return rowcount - - def has_expired(date, days=60): from datetime import datetime diff --git a/predict_occultation/src/parsl_config.py b/predict_occultation/src/parsl_config.py new file mode 100755 index 00000000..062c2750 --- /dev/null +++ b/predict_occultation/src/parsl_config.py @@ -0,0 +1,125 @@ +import os +from pathlib import Path + +from parsl.channels import SSHChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SrunLauncher +from parsl.providers import LocalProvider, SlurmProvider + + +def get_config(key, jobpath): + """ + Creates an instance of the Parsl configuration + + Args: + key (str): The key of the configuration to be returned. + Options are: 'local' or 'linea'. + + Returns: + config: Parsl config instance. + """ + + parsl_env = os.getenv("PARSL_ENV", "local") + + if parsl_env == "linea": + # Check Linea PATH envs + pipeline_root = os.getenv("REMOTE_PIPELINE_ROOT", None) + if not pipeline_root: + raise Exception( + "It is necessary to configure the REMOTE_PIPELINE_ROOT variable when PARSL_ENV=linea" + ) + + pipeline_root = Path(pipeline_root) + condapath = pipeline_root.joinpath("miniconda/bin") + pipeline_pred_occ = pipeline_root.joinpath("predict_occultation") + pipeline_path = pipeline_root.joinpath("predict_occultation/pipeline") + + # Diretório de Outputs + predict_outputs = os.getenv("PREDICT_OUTPUTS", None) + if not predict_outputs: + raise Exception( + "It is necessary to configure the PREDICT_OUTPUTS variable when PARSL_ENV=linea" + ) + predict_outputs = Path(predict_outputs) + predict_outputs.mkdir(parents=True, exist_ok=True) + + # Linea SSH user keys + # sshkey = os.getenv("SSHKEY", "~/.ssh/id_rsa") + sshkey = os.getenv("SSHKEY", None) + if not sshkey: + raise Exception( + "It is necessary to configure the SSHKEY variable when PARSL_ENV=linea" + ) + + # Linea DB prod_gavo DB uri. Catalog DB. + db_uri = os.getenv("DB_URI", None) + if not db_uri: + raise Exception( + "It is necessary to configure the DB_URI variable when PARSL_ENV=linea" + ) + + # Env.sh que sera executado antes de iniciar as tasks no cluster + cluster_env_sh = pipeline_pred_occ.joinpath("cluster.sh") + + # Script DIR dentro do diretorio do job + script_dir = pipeline_root.joinpath("script_dir") + script_dir.mkdir(parents=True, exist_ok=True) + + executors = { + "linea": HighThroughputExecutor( + label="linea", + worker_logdir_root=str(script_dir), + provider=SlurmProvider( + partition="cpu", + nodes_per_block=2, # number of nodes + cmd_timeout=240, # duration for which the provider will wait for a command to be invoked on a remote system + launcher=SrunLauncher(debug=True, overrides=""), + init_blocks=2, + min_blocks=3, + max_blocks=12, + parallelism=1, + walltime="15:00:00", + worker_init=f"source {cluster_env_sh}\n", + channel=SSHChannel( + hostname="loginapl01", + username="app.tno", + key_filename=sshkey, + script_dir=str(script_dir), + envs={ + "PARSL_ENV": "linea", + "CONDAPATH": str(condapath), + "PIPELINE_PREDIC_OCC": str(pipeline_pred_occ), + "PIPELINE_PATH": str(pipeline_path), + "PYTHONPATH": ":".join( + [ + str(pipeline_root), + str(pipeline_pred_occ), + str(pipeline_path), + ] + ), + "PREDICT_OUTPUTS": str(predict_outputs), + "DB_URI": db_uri, + }, + ), + ), + ) + } + + if parsl_env == "local": + executors = { + "local": HighThroughputExecutor( + label="local", + worker_debug=False, + max_workers=4, + provider=LocalProvider( + min_blocks=1, + init_blocks=1, + max_blocks=1, + parallelism=1, + worker_init=f"source /app/src/env.sh\n", + ), + ), + } + + return Config(executors=[executors[key]], strategy=None) diff --git a/predict_occultation/src/predict_occultation/cluster.sh b/predict_occultation/src/predict_occultation/cluster.sh new file mode 100755 index 00000000..08419e2e --- /dev/null +++ b/predict_occultation/src/predict_occultation/cluster.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# export CONDAPATH=$SLURM_REMOTE_DIR/miniconda/bin +#export PIPELINE_PREDIC_OCC=/lustre/t1/scratch/users/app.tno/tno_testing/predict_occultation +#export PIPELINE_PATH=/lustre/t1/scratch/users/app.tno/tno_testing/predict_occultation/pipeline +#export PYTHONPATH=$PYTHONPATH:$PIPELINE_PATH:$PIPELINE_PREDIC_OCC + +source $CONDAPATH/activate +conda activate py3 + +# export PIPELINE_PREDIC_OCC=$SLURM_REMOTE_DIR/predict_occultation +# export PIPELINE_PATH=$SLURM_REMOTE_DIR/predict_occultation/pipeline +# export PYTHONPATH=$SLURM_REMOTE_DIR:$PIPELINE_PREDIC_OCC:$PIPELINE_PATH:$PYTHONPATH + +# export DB_URI=postgresql+psycopg2://untrustedprod:untrusted@desdb4.linea.gov.br:5432/prod_gavo +# export PARSL_ENV=linea + +ulimit -s 100000 +ulimit -u 100000 +umask 0002 diff --git a/predict_occultation/src/predict_occultation/parsl_config.py b/predict_occultation/src/predict_occultation/parsl_config.py deleted file mode 100755 index 3c7e4d78..00000000 --- a/predict_occultation/src/predict_occultation/parsl_config.py +++ /dev/null @@ -1,63 +0,0 @@ -import os - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import SrunLauncher -from parsl.providers import LocalProvider, SlurmProvider - - -def get_config(key): - """ - Creates an instance of the Parsl configuration - - Args: - key (str): The key of the configuration to be returned. - Options are: 'local' or 'linea'. - - Returns: - config: Parsl config instance. - """ - - pipe_dir = os.getenv("WORKFLOW_PATH", ".") - slurm_dir = os.getenv("SLURM_REMOTE_DIR", pipe_dir) - sshkey = os.getenv("SSHKEY", "~/.ssh/id_rsa") - - executors = { - "linea": HighThroughputExecutor( - label="linea", - worker_logdir_root=f'{"/".join([slurm_dir, "script_dir"])}', - provider=SlurmProvider( - partition="cpu", - nodes_per_block=2, # number of nodes - cmd_timeout=240, # duration for which the provider will wait for a command to be invoked on a remote system - launcher=SrunLauncher(debug=True, overrides=""), - init_blocks=2, - min_blocks=3, - max_blocks=12, - parallelism=1, - walltime="15:00:00", - worker_init=f'source {"/".join([slurm_dir, "cluster.sh"])}\n', - channel=SSHChannel( - hostname="loginapl01", - username="app.tno", - key_filename=sshkey, - script_dir=f'{"/".join([slurm_dir, "script_dir"])}', - ), - ), - ), - "local": HighThroughputExecutor( - label="local", - worker_debug=False, - max_workers=4, - provider=LocalProvider( - min_blocks=1, - init_blocks=1, - max_blocks=1, - parallelism=1, - worker_init=f"source /app/src/env.sh\n", - ), - ), - } - - return Config(executors=[executors[key]], strategy=None) diff --git a/predict_occultation/src/predict_occultation/pipeline/run.sh b/predict_occultation/src/predict_occultation/pipeline/run.sh index 60de9c20..c85edfb6 100755 --- a/predict_occultation/src/predict_occultation/pipeline/run.sh +++ b/predict_occultation/src/predict_occultation/pipeline/run.sh @@ -11,7 +11,16 @@ _bsp_planetary=$8 umask 0002 +echo "Enviroment Vars ==========================================" + +echo 'PARSL_ENV: ' $PARSL_ENV +echo 'CONDAPATH: ' $CONDAPATH +echo 'PIPELINE_PREDIC_OCC: ' $PIPELINE_PREDIC_OCC echo 'PIPELINE_PATH: ' $PIPELINE_PATH +echo 'PYTHONPATH: ' $PYTHONPATH +echo 'PREDICT_OUTPUTS: ' $PREDICT_OUTPUTS + +echo "==========================================================" TMPDIR=`echo $RANDOM | md5sum | head -c 5; echo;` export DIR_DATA=/tmp/$TMPDIR diff --git a/predict_occultation/src/run_pred_occ.py b/predict_occultation/src/run_pred_occ.py index ce1e405d..77def755 100644 --- a/predict_occultation/src/run_pred_occ.py +++ b/predict_occultation/src/run_pred_occ.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -import configparser import json import os import pathlib @@ -13,7 +12,6 @@ import humanize import pandas as pd -import tqdm from asteroid import Asteroid from dao import ( AsteroidDao, @@ -23,8 +21,8 @@ ) try: + from parsl_config import get_config from predict_occultation.app import run_pipeline - from predict_occultation.parsl_config import get_config except Exception as error: print("Error: %s" % str(error)) raise ("Predict Occultation pipeline not installed!") @@ -225,8 +223,6 @@ def remove_job_directory(jobid): def get_job_path(jobid): """Retorna o path para o diretorio do job, cria o diretorio caso nao exista.""" - # config = get_configs() - # orbit_trace_root = config["DEFAULT"].get("PredictOccultationJobPath") orbit_trace_root = os.getenv("PREDICT_OUTPUTS", "/app/outputs/predict_occultation") folder_name = f"{jobid}" # folder_name = f"teste_{job['id']}" @@ -239,6 +235,10 @@ def get_job_path(jobid): if not job_path.exists(): job_path.mkdir(parents=True, mode=0o775) + # Parsl Script dir + script_dir = job_path.joinpath("script_dir") + script_dir.mkdir(parents=True, mode=0o775) + print(f"Job Path: {job_path}") return job_path @@ -416,13 +416,6 @@ def ingest_job_results(job_path, job_id): return rowcount -def get_configs(): - # Carrega as variaveis de configuração do arquivo config.ini - config = configparser.ConfigParser() - config.read("config.ini") - return config - - def read_job_json_by_id(jobid): dao = PredictOccultationJobDao() job_db = dao.get_job_by_id(jobid) @@ -594,15 +587,6 @@ def submit_tasks(jobid: int): log.info("Update Job status to running.") update_job(job) - # =========================== Parsl =========================== - log.info("Settings Parsl configurations") - envname = os.getenv("PARSL_ENV", "linea") - parsl_conf = get_config(envname) - # Altera o diretório runinfo para dentro do diretório do job. - parsl_conf.run_dir = os.path.join(current_path, "runinfo") - parsl.clear() - parsl.load(parsl_conf) - # =========================== Parameters =========================== # ASTEROID_PATH: Diretório onde serão armazenados todos os arquivos referentes @@ -611,6 +595,7 @@ def submit_tasks(jobid: int): # Atenção: Precisar permitir uma quantidade grande de acessos de leitura e escrita simultaneas. ASTEROID_PATH = current_path.joinpath("asteroids") ASTEROID_PATH.mkdir(parents=True, exist_ok=False) + log.debug(f"Asteroid PATH: [{ASTEROID_PATH}]") # Parametros usados na Predição de Ocultação @@ -662,6 +647,19 @@ def submit_tasks(jobid: int): DES_OBSERVATIONS_DAYS_TO_EXPIRE = inputs_days_to_expire log.debug("Input days to expire: [%s]" % inputs_days_to_expire) + # =========================== Parsl =========================== + log.info("Settings Parsl configurations") + envname = os.getenv("PARSL_ENV", "linea") + parsl_conf = get_config(envname, current_path) + # Altera o diretório runinfo para dentro do diretório do job. + parsl_conf.run_dir = os.path.join(current_path, "runinfo") + # parsl_conf.executors[0].provider.channel.script_dir = os.path.join( + # current_path, "script_dir" + # ) + + parsl.clear() + parsl.load(parsl_conf) + # ======================= Generate dates file ======================= # Arquivo de datas pode ser o mesmo para todos os asteroids. # Executa o programa fortran geradata. @@ -1126,7 +1124,6 @@ def consolidate_job_results(consolidated, job_path, log): df_result = pd.DataFrame( consolidated, columns=[ - "ast_id", "name", "number", "base_dynclass", @@ -1200,20 +1197,16 @@ def consolidate_job_results(consolidated, job_path, log): def complete_job(job, log, status): consolidated_filepath = pathlib.Path(job.get("path"), "job_consolidated.csv") - if consolidated_filepath.exists(): + if not consolidated_filepath.exists(): + raise Exception(f"Consolidated file not exists. [{consolidated_filepath}]") - df = pd.read_csv(consolidated_filepath, delimiter=";") + df = pd.read_csv(consolidated_filepath, delimiter=";") - l_status = df["status"].to_list() - count_success = int(l_status.count(1)) - count_failures = int(l_status.count(2)) - occultations = int(df["ing_occ_count"].sum()) - ast_with_occ = int((df["ing_occ_count"] != 0).sum()) - else: - count_success = 0 - count_failures = job["count_asteroids"] - occultations = 0 - ast_with_occ = 0 + l_status = df["status"].to_list() + count_success = int(l_status.count(1)) + count_failures = int(l_status.count(2)) + occultations = int(df["ing_occ_count"].sum()) + ast_with_occ = int((df["ing_occ_count"] != 0).sum()) t0 = datetime.fromisoformat(job.get("start")) t1 = datetime.now(tz=timezone.utc) @@ -1282,7 +1275,10 @@ def generate_dates_file( "geradata", stdin=subprocess.PIPE, stdout=fp, shell=True ) p.communicate(str.encode(strParameters)) - log.debug(f"Geradata Command: [geradata {strParameters}]") + + log.debug( + "Geradata Command: [geradata %s]" % strParameters.replace("\n", " ") + ) if filepath.exists(): filepath.chmod(0o664)