Skip to content

Commit

Permalink
Add get_queue_info helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
cat-bro committed Jun 11, 2024
1 parent 50ac513 commit 83e0c42
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions tpv/core/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import random
from functools import reduce
from galaxy import model
from sqlalchemy import func, text, Float


GIGABYTES = 1024.0**3

Expand Down Expand Up @@ -125,3 +127,42 @@ def get_dataset_attributes(datasets):
'size': get_dataset_size(i.dataset.dataset)}
for i in datasets or {}
}

def get_queue_info(app):
# query the Galaxy DB and return a dictionary of sums of cores/mem/gpus
# for queued and running jobs. This requires 'tpv_mem', 'tpv_cores' and
# 'tpv_gpus' params to be stored for all non-terminal jobs. These params
# can be inherited from the default tool in the TPV shared database.
query = app.model.context.query(
model.Job.destination_id,
model.Job.state,
func.count(model.Job.id).label('job_count'),
func.sum(
func.cast(text("encode(destination_params, 'escape')::json ->> 'tpv_cores'"), Float)
).label('sum_cores'),
func.sum(
func.cast(text("encode(destination_params, 'escape')::json ->> 'tpv_mem'"), Float)
).label('sum_mem'),
func.sum(
func.cast(text("encode(destination_params, 'escape')::json ->> 'tpv_gpus'"), Float)
).label('sum_gpus'),
).filter(
model.Job.state.in_(['queued', 'running'])
).group_by(
model.Job.destination_id,
model.Job.state
)
results = query.all()

db_queue_info = {}
for row in results:
destination_id, state, job_count, sum_cores, sum_mem, sum_gpus = row
if not destination_id in db_queue_info:
db_queue_info[destination_id] = {}
db_queue_info[destination_id][state] = {
"sum_cores": sum_cores,
"sum_mem": sum_mem,
"sum_gpus": sum_gpus,
"job_count": job_count,
}
return db_queue_info

0 comments on commit 83e0c42

Please sign in to comment.