diff --git a/tpv/core/helpers.py b/tpv/core/helpers.py index 343b79c..4304a72 100644 --- a/tpv/core/helpers.py +++ b/tpv/core/helpers.py @@ -8,6 +8,8 @@ import random from functools import reduce from galaxy import model +from sqlalchemy import func, text, Float + GIGABYTES = 1024.0**3 @@ -125,3 +127,42 @@ def get_dataset_attributes(datasets): 'size': get_dataset_size(i.dataset.dataset)} for i in datasets or {} } + +def get_queue_info(app): + # query the Galaxy DB and return a dictionary of sums of cores/mem/gpus + # for queued and running jobs. This requires 'tpv_mem', 'tpv_cores' and + # 'tpv_gpus' params to be stored for all non-terminal jobs. These params + # can be inherited from the default tool in the TPV shared database. + query = app.model.context.query( + model.Job.destination_id, + model.Job.state, + func.count(model.Job.id).label('job_count'), + func.sum( + func.cast(text("encode(destination_params, 'escape')::json ->> 'tpv_cores'"), Float) + ).label('sum_cores'), + func.sum( + func.cast(text("encode(destination_params, 'escape')::json ->> 'tpv_mem'"), Float) + ).label('sum_mem'), + func.sum( + func.cast(text("encode(destination_params, 'escape')::json ->> 'tpv_gpus'"), Float) + ).label('sum_gpus'), + ).filter( + model.Job.state.in_(['queued', 'running']) + ).group_by( + model.Job.destination_id, + model.Job.state + ) + results = query.all() + + db_queue_info = {} + for row in results: + destination_id, state, job_count, sum_cores, sum_mem, sum_gpus = row + if not destination_id in db_queue_info: + db_queue_info[destination_id] = {} + db_queue_info[destination_id][state] = { + "sum_cores": sum_cores, + "sum_mem": sum_mem, + "sum_gpus": sum_gpus, + "job_count": job_count, + } + return db_queue_info