Skip to content

Commit

Permalink
The function that updates the progress of a job has been changed.
Browse files Browse the repository at this point in the history
  • Loading branch information
glaubervila committed Feb 25, 2025
1 parent c4489dc commit cf0be46
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 85 deletions.
15 changes: 14 additions & 1 deletion backend/tno/views/prediction_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,25 @@ def status(self, request, pk=None):
@action(detail=True, methods=["post"])
def cancel_job(self, request, pk=None):
"""
Sinaliza o pediio de cancelamento um Prediction job, alterando status para aborting
Sinaliza o pedido de cancelamento um Prediction job, alterando status para aborting
"""
job = self.get_object()
# Se o job estiver idle=1 ou running=2
if job.status <= 2:
# Marca o job como 7-Aborting
job.status = 7
job.save()

# Alterar todas as tasks do job para aborting
# 3-Queued
tasks = job.predictionjobresult_set.filter(status=3)
tasks.update(status=5)

# Verificar se tem alguma task em execução
# se não tiver altera o status direto para 5-Aborted
tasks = job.predictionjobresult_set.filter(status=4)
if len(tasks) == 0:
job.status = 5
job.save()
result = PredictionJobSerializer(job)
return Response(result.data)
2 changes: 1 addition & 1 deletion predict_occultation/src/daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ set -o pipefail
source /app/src/env.sh

echo "Starting the daemon"
while true; do python /app/src/run_daemon.py ; sleep 30; done
while true; do python /app/src/run_daemon.py ; sleep 10; done
5 changes: 5 additions & 0 deletions predict_occultation/src/dao/db_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def fetch_one_dict(self, stm):
else:
return None

def fetch_scalar(self, stm):
engine = self.get_db_engine()
with engine.connect() as con:
return con.execute(stm).scalar()

def get_job_by_id(self, id):

tbl = self.get_table(tablename="des_astrometryjob")
Expand Down
11 changes: 10 additions & 1 deletion predict_occultation/src/dao/predict_occultation_job_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from dao.db_base import DBBase
from sqlalchemy.sql import and_, delete, select

from sqlalchemy import func
from typing import List

class PredictOccultationJobResultDao(DBBase):
def __init__(self):
Expand Down Expand Up @@ -66,3 +67,11 @@ def by_job_id(self, job_id):
stm = select(self.tbl.c).where(and_(self.tbl.c.job_id == job_id))

return self.fetch_all_dict(stm)

def count_by_job_id(self, job_id, status: List = None):
stm = select([func.count()]).select_from(self.tbl).where(and_(self.tbl.c.job_id == job_id))
if status:
stm = stm.where(self.tbl.c.status.in_(status))

# print(stm)
return self.fetch_scalar(stm)
13 changes: 13 additions & 0 deletions predict_occultation/src/run_daemon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from run_pred_occ import predict_job_queue
from run_pred_occ import get_job_running, update_job_progress
from run_pred_occ import run_job as predict_run_job


Expand All @@ -15,7 +16,19 @@ def predict_occultation_queue():

if __name__ == "__main__":
try:
# Submete jobs em idle para execução.
run_id = predict_occultation_queue()

# Verifica se existe algum job em execução.
running_id = get_job_running()
if running_id:
print(f"Job running: [{running_id}]")
update_job_progress(running_id)
else:
print("No job running.")



except Exception as e:
# Este exception é para evitar que a daemon desligue em caso de falha.
print(e)
252 changes: 170 additions & 82 deletions predict_occultation/src/run_pred_occ.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,70 @@ def get_job_running():
return job.get("id")


def update_job_progress(jobid):
# TODO: Implementação temporaria para atualizar o progresso do job.
dao_job = PredictOccultationJobDao()
dao_job_result = PredictOccultationJobResultDao()

job = dao_job.get_job_by_id(jobid)

# Total de asteroids
count_asteroids = job["count_asteroids"]
# print(f"Job asteroid {count_asteroids}" )

# Total de tasks submetidas.
submited_tasks = dao_job_result.count_by_job_id(jobid)
# print(f"Submited Tasks {submited_tasks}")

# Job results/tasks status
# (1, "Idle"),
# (2, "Running"),
# (3, "Completed"),
# (4, "Failed"),
# (5, "Aborted"),
# (6, "Warning"),
# (7, "Aborting"),

# Total de tasks completas.
completed_tasks = dao_job_result.count_by_job_id(jobid, status=[3, 4, 5, 6])
# print(f"Completed Tasks {completed_tasks}")

success_tasks = dao_job_result.count_by_job_id(jobid, status=[3])
# print(f"Success Tasks {success_tasks}")
failed_tasks = dao_job_result.count_by_job_id(jobid, status=[4])
# print(f"Failed Tasks {failed_tasks}")

# Update job progress stage 1 - Submission of tasks
# 3 = completed
# 2 = running
stage1_status = 3 if submited_tasks == count_asteroids else 2
update_progress_status(
jobid,
step=1,
status=stage1_status,
count=count_asteroids,
current=submited_tasks,
# success=step1_success,
# failures=step1_failures,
t0=job["start"],
)

# Update job progress stage 2 - Running tasks
stage2_status = 3 if completed_tasks == count_asteroids else 2
update_progress_status(
jobid,
step=2,
status=stage2_status,
count=submited_tasks,
current=completed_tasks,
success=success_tasks,
failures=failed_tasks,
t0=job["start"],
)




def predict_job_queue():
# Verifica se ha algum job sendo executado.
if has_job_running():
Expand Down Expand Up @@ -698,16 +762,16 @@ def submit_tasks(jobid: int):
step1_success += 1
current_idx += 1

update_progress_status(
jobid,
step=1,
status=2,
count=len(asteroids),
current=current_idx,
success=step1_success,
failures=step1_failures,
t0=hb_t0,
)
# update_progress_status(
# jobid,
# step=1,
# status=2,
# count=len(asteroids),
# current=current_idx,
# success=step1_success,
# failures=step1_failures,
# t0=hb_t0,
# )

# ======================= Submeter o Job por asteroide ==========================
# Adicionar os asteroids a tabela job result ( que agora passa a representar as job tasks.)
Expand Down Expand Up @@ -758,29 +822,29 @@ def submit_tasks(jobid: int):
log.error("Error running asteroid %s" % name)
continue

update_progress_status(
jobid,
step=1,
status=3,
count=len(asteroids),
current=current_idx,
success=step1_success,
failures=step1_failures,
t0=hb_t0,
)
# update_progress_status(
# jobid,
# step=1,
# status=3,
# count=len(asteroids),
# current=current_idx,
# success=step1_success,
# failures=step1_failures,
# t0=hb_t0,
# )

log.info("All jobs have been submitted.")

update_progress_status(
jobid,
step=2,
status=2,
count=len(jobs_asteroids),
current=step2_current_idx,
success=step2_success,
failures=step2_failures,
t0=hb_t0,
)
# update_progress_status(
# jobid,
# step=2,
# status=2,
# count=len(jobs_asteroids),
# current=step2_current_idx,
# success=step2_success,
# failures=step2_failures,
# t0=hb_t0,
# )

log.debug(f"Jobs to Parsl: [{len(jobs_asteroids)}]")

Expand Down Expand Up @@ -838,16 +902,16 @@ def submit_tasks(jobid: int):
if is_abort:
raise AbortError("Job ID %s aborted!" % str(jobid), -1)

update_progress_status(
jobid,
step=2,
status=3,
count=len(jobs_asteroids),
current=step2_current_idx,
success=step2_success,
failures=step2_failures,
t0=hb_t0,
)
# update_progress_status(
# jobid,
# step=2,
# status=3,
# count=len(jobs_asteroids),
# current=step2_current_idx,
# success=step2_success,
# failures=step2_failures,
# t0=hb_t0,
# )

job.update({"status": "Completed"})
except AbortError as e:
Expand All @@ -866,27 +930,27 @@ def submit_tasks(jobid: int):
}
)

update_progress_status(
jobid,
step=1,
status=5,
count=step1_count,
current=current_idx,
success=step1_success,
failures=step1_failures,
t0=hb_t0,
)

update_progress_status(
jobid,
step=2,
status=5,
count=step2_count,
current=step2_current_idx,
success=step2_success,
failures=step2_failures,
t0=hb_t0,
)
# update_progress_status(
# jobid,
# step=1,
# status=5,
# count=step1_count,
# current=current_idx,
# success=step1_success,
# failures=step1_failures,
# t0=hb_t0,
# )

# update_progress_status(
# jobid,
# step=2,
# status=5,
# count=step2_count,
# current=step2_current_idx,
# success=step2_success,
# failures=step2_failures,
# t0=hb_t0,
# )

except Exception as e:
trace = traceback.format_exc()
Expand All @@ -902,27 +966,27 @@ def submit_tasks(jobid: int):
}
)

update_progress_status(
jobid,
step=1,
status=4,
count=step1_count,
current=current_idx,
success=step1_success,
failures=step1_failures,
t0=hb_t0,
)

update_progress_status(
jobid,
step=2,
status=4,
count=step2_count,
current=step2_current_idx,
success=step2_success,
failures=step2_failures,
t0=hb_t0,
)
# update_progress_status(
# jobid,
# step=1,
# status=4,
# count=step1_count,
# current=current_idx,
# success=step1_success,
# failures=step1_failures,
# t0=hb_t0,
# )

# update_progress_status(
# jobid,
# step=2,
# status=4,
# count=step2_count,
# current=step2_current_idx,
# success=step2_success,
# failures=step2_failures,
# t0=hb_t0,
# )

finally:
log.debug("-----------------------------------------------")
Expand Down Expand Up @@ -1033,6 +1097,30 @@ def complete_job(job, log, status):
# write_job_file(current_path, job)
update_job(job)


# # Update Job Progress
# update_progress_status(
# job['id'],
# step=1,
# status=3,
# count=step1_count,
# current=current_idx,
# success=step1_success,
# failures=step1_failures,
# t0=hb_t0,
# )

# update_progress_status(
# job['id'],
# step=2,
# status=3,
# count=step2_count,
# current=step2_current_idx,
# success=step2_success,
# failures=step2_failures,
# t0=hb_t0,
# )

# Remove o diretório de asteroids do job.
if not job["debug"]:
log.debug("Removing asteroid directory.")
Expand Down

0 comments on commit cf0be46

Please sign in to comment.