diff --git a/CHANGELOG.md b/CHANGELOG.md index aad36508..7b83eb65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New Classes to interact with Database QoS (WIP) - `pyslurm.db.QualityOfService` - `pyslurm.db.QualitiesOfService` +- Add `truncate_time` option to `pyslurm.db.JobFilter`, which is the same as -T / + --truncate from sacct. +- Add new Attributes to `pyslurm.db.Jobs` that help gathering statistics for a + collection of Jobs more convenient. +- Fix `allocated_gres` attribute in the `pyslurm.Node` Class returning nothing. +- Add new `idle_memory` and `allocated_tres` attributes to `pyslurm.Node` class +- Fix Node State being displayed as `ALLOCATED` when it should actually be + `MIXED`. ## [23.2.2](https://github.com/PySlurm/pyslurm/releases/tag/v23.2.2) - 2023-07-18 diff --git a/pyslurm/db/job.pxd b/pyslurm/db/job.pxd index 70ef0311..a06791bf 100644 --- a/pyslurm/db/job.pxd +++ b/pyslurm/db/job.pxd @@ -123,6 +123,12 @@ cdef class JobFilter: Instruct the slurmdbd to also send the job environment(s) Note: This requires specifying explictiy job ids, and is mutually exclusive with `with_script` + truncate_time (bool): + Truncate start and end time. + For example, when a Job has actually started before the requested + `start_time`, the time will be truncated to `start_time`. Same + logic applies for `end_time`. This is like the `-T` / `--truncate` + option from `sacct`. """ cdef slurmdb_job_cond_t *ptr @@ -149,11 +155,60 @@ cdef class JobFilter: nodelist with_script with_env + truncate_time cdef class Jobs(MultiClusterMap): - """A [`Multi Cluster`][pyslurm.xcollections.MultiClusterMap] collection of [pyslurm.db.Job][] objects.""" - pass + """A [`Multi Cluster`][pyslurm.xcollections.MultiClusterMap] collection of [pyslurm.db.Job][] objects. + + Args: + jobs (Union[list[int], dict[int, pyslurm.db.Job], str], optional=None): + Jobs to initialize this collection with. + + Attributes: + consumed_energy (int): + Total amount of energy consumed, in joules. + disk_read (int): + Total amount of bytes read. + disk_write (int): + Total amount of bytes written. + page_faults (int): + Total amount of page faults. + resident_memory (int): + Total Resident Set Size (RSS) used in bytes. + virtual_memory (int): + Total Virtual Memory Size (VSZ) used in bytes. + elapsed_cpu_time (int): + Total amount of time used (Elapsed time * cpu count) in seconds. + This is not the real CPU-Efficiency, but rather the total amount + of cpu-time the CPUs were occupied for. + total_cpu_time (int): + Sum of `user_cpu_time` and `system_cpu_time`, in seconds + user_cpu_time (int): + Total amount of Time spent in user space, in seconds + system_cpu_time (int): + Total amount of Time spent in kernel space, in seconds + cpus (int): + Total amount of cpus. + nodes (int): + Total amount of nodes. + memory (int): + Total amount of requested memory in Mebibytes. + """ + cdef public: + consumed_energy + disk_read + disk_write + page_faults + resident_memory + virtual_memory + elapsed_cpu_time + total_cpu_time + user_cpu_time + system_cpu_time + cpus + nodes + memory cdef class Job: @@ -252,7 +307,7 @@ cdef class Job: Amount of CPUs the Job has/had allocated, or, if the Job is still pending, this will reflect the amount requested. memory (int): - Amount of memory the Job requested in total + Amount of memory the Job requested in total, in Mebibytes reservation (str): Name of the Reservation for this Job script (str): diff --git a/pyslurm/db/job.pyx b/pyslurm/db/job.pyx index beea1861..0457e1fa 100644 --- a/pyslurm/db/job.pyx +++ b/pyslurm/db/job.pyx @@ -29,6 +29,10 @@ from typing import Any from pyslurm.utils.uint import * from pyslurm.settings import LOCAL_CLUSTER from pyslurm import xcollections +from pyslurm.db.stats import ( + reset_stats_for_job_collection, + add_stats_to_job_collection, +) from pyslurm.utils.ctime import ( date_to_timestamp, timestr_to_mins, @@ -146,6 +150,9 @@ cdef class JobFilter: if self.nodelist: cstr.fmalloc(&ptr.used_nodes, nodelist_to_range_str(self.nodelist)) + + if self.truncate_time: + ptr.flags &= ~slurm.JOBCOND_FLAG_NO_TRUNC if self.ids: # These are only allowed by the slurmdbd when specific jobs are @@ -196,6 +203,7 @@ cdef class Jobs(MultiClusterMap): val_type=Job, id_attr=Job.id, key_type=int) + self._reset_stats() @staticmethod def load(JobFilter db_filter=None, Connection db_connection=None): @@ -275,15 +283,35 @@ cdef class Jobs(MultiClusterMap): job = Job.from_ptr(job_ptr.data) job.qos_data = qos_data job._create_steps() - JobStatistics._sum_step_stats_for_job(job, job.steps) + job.stats = JobStatistics.from_job_steps(job) cluster = job.cluster if cluster not in out.data: out.data[cluster] = {} out[cluster][job.id] = job + add_stats_to_job_collection(out, job.stats) + out.cpus += job.cpus + out.nodes += job.num_nodes + out.memory += job.memory + return out + def _reset_stats(self): + reset_stats_for_job_collection(self) + self.cpus = 0 + self.nodes = 0 + self.memory = 0 + + def calc_stats(self): + """(Re)Calculate Statistics for the Job Collection.""" + self._reset_stats() + for job in self.values(): + add_stats_to_job_collection(self, job.stats) + self.cpus += job.cpus + self.nodes += job.num_nodes + self.memory += job.memory + @staticmethod def modify(db_filter, Job changes, db_connection=None): """Modify Slurm database Jobs. @@ -445,7 +473,6 @@ cdef class Job: cdef Job wrap = Job.__new__(Job) wrap.ptr = in_ptr wrap.steps = JobSteps.__new__(JobSteps) - wrap.stats = JobStatistics() return wrap @staticmethod @@ -738,7 +765,7 @@ cdef class Job: else: # Job is still pending, so we return the number of requested cpus # instead. - return u32_parse(self.ptr.req_cpus) + return u32_parse(self.ptr.req_cpus, on_noval=0, zero_is_noval=False) @property def memory(self): diff --git a/pyslurm/db/stats.pxd b/pyslurm/db/stats.pxd index 1ca9c701..5615b2c3 100644 --- a/pyslurm/db/stats.pxd +++ b/pyslurm/db/stats.pxd @@ -139,6 +139,9 @@ cdef class JobStatistics: user_cpu_time system_cpu_time + @staticmethod + cdef JobStatistics from_job_steps(Job job) + @staticmethod cdef JobStatistics from_step(JobStep step) diff --git a/pyslurm/db/stats.pyx b/pyslurm/db/stats.pyx index 7bbb2a8a..c2da1145 100644 --- a/pyslurm/db/stats.pyx +++ b/pyslurm/db/stats.pyx @@ -28,6 +28,32 @@ from pyslurm.utils.helpers import ( ) +def reset_stats_for_job_collection(jobs): + jobs.consumed_energy = 0 + jobs.disk_read = 0 + jobs.disk_write = 0 + jobs.page_faults = 0 + jobs.resident_memory = 0 + jobs.virtual_memory = 0 + jobs.elapsed_cpu_time = 0 + jobs.total_cpu_time = 0 + jobs.user_cpu_time = 0 + jobs.system_cpu_time = 0 + + +def add_stats_to_job_collection(jobs, JobStatistics js): + jobs.consumed_energy += js.consumed_energy + jobs.disk_read += js.avg_disk_read + jobs.disk_write += js.avg_disk_write + jobs.page_faults += js.avg_page_faults + jobs.resident_memory += js.avg_resident_memory + jobs.virtual_memory += js.avg_virtual_memory + jobs.elapsed_cpu_time += js.elapsed_cpu_time + jobs.total_cpu_time += js.total_cpu_time + jobs.user_cpu_time += js.user_cpu_time + jobs.system_cpu_time += js.system_cpu_time + + cdef class JobStatistics: def __init__(self): @@ -50,6 +76,21 @@ cdef class JobStatistics: def to_dict(self): return instance_to_dict(self) + @staticmethod + cdef JobStatistics from_job_steps(Job job): + cdef JobStatistics job_stats = JobStatistics() + + for step in job.steps.values(): + job_stats._add_base_stats(step.stats) + + job_stats._sum_cpu_time(job) + + step_count = len(job.steps) + if step_count: + job_stats.avg_cpu_frequency /= step_count + + return job_stats + @staticmethod cdef JobStatistics from_step(JobStep step): cdef JobStatistics wrap = JobStatistics() @@ -140,68 +181,56 @@ cdef class JobStatistics: return wrap - @staticmethod - def _sum_step_stats_for_job(Job job, JobSteps steps): - cdef: - JobStatistics job_stats = job.stats - JobStatistics step_stats = None - - for step in steps.values(): - step_stats = step.stats - - job_stats.consumed_energy += step_stats.consumed_energy - job_stats.avg_cpu_time += step_stats.avg_cpu_time - job_stats.avg_cpu_frequency += step_stats.avg_cpu_frequency - job_stats.avg_disk_read += step_stats.avg_disk_read - job_stats.avg_disk_write += step_stats.avg_disk_write - job_stats.avg_page_faults += step_stats.avg_page_faults - - if step_stats.max_disk_read >= job_stats.max_disk_read: - job_stats.max_disk_read = step_stats.max_disk_read - job_stats.max_disk_read_node = step_stats.max_disk_read_node - job_stats.max_disk_read_task = step_stats.max_disk_read_task - - if step_stats.max_disk_write >= job_stats.max_disk_write: - job_stats.max_disk_write = step_stats.max_disk_write - job_stats.max_disk_write_node = step_stats.max_disk_write_node - job_stats.max_disk_write_task = step_stats.max_disk_write_task - - if step_stats.max_page_faults >= job_stats.max_page_faults: - job_stats.max_page_faults = step_stats.max_page_faults - job_stats.max_page_faults_node = step_stats.max_page_faults_node - job_stats.max_page_faults_task = step_stats.max_page_faults_task - - if step_stats.max_resident_memory >= job_stats.max_resident_memory: - job_stats.max_resident_memory = step_stats.max_resident_memory - job_stats.max_resident_memory_node = step_stats.max_resident_memory_node - job_stats.max_resident_memory_task = step_stats.max_resident_memory_task - job_stats.avg_resident_memory = job_stats.max_resident_memory - - if step_stats.max_virtual_memory >= job_stats.max_virtual_memory: - job_stats.max_virtual_memory = step_stats.max_virtual_memory - job_stats.max_virtual_memory_node = step_stats.max_virtual_memory_node - job_stats.max_virtual_memory_task = step_stats.max_virtual_memory_task - job_stats.avg_virtual_memory = job_stats.max_virtual_memory - - if step_stats.min_cpu_time >= job_stats.min_cpu_time: - job_stats.min_cpu_time = step_stats.min_cpu_time - job_stats.min_cpu_time_node = step_stats.min_cpu_time_node - job_stats.min_cpu_time_task = step_stats.min_cpu_time_task - + def _add_base_stats(self, JobStatistics src): + self.consumed_energy += src.consumed_energy + self.avg_cpu_time += src.avg_cpu_time + self.avg_cpu_frequency += src.avg_cpu_frequency + self.avg_disk_read += src.avg_disk_read + self.avg_disk_write += src.avg_disk_write + self.avg_page_faults += src.avg_page_faults + + if src.max_disk_read >= self.max_disk_read: + self.max_disk_read = src.max_disk_read + self.max_disk_read_node = src.max_disk_read_node + self.max_disk_read_task = src.max_disk_read_task + + if src.max_disk_write >= self.max_disk_write: + self.max_disk_write = src.max_disk_write + self.max_disk_write_node = src.max_disk_write_node + self.max_disk_write_task = src.max_disk_write_task + + if src.max_page_faults >= self.max_page_faults: + self.max_page_faults = src.max_page_faults + self.max_page_faults_node = src.max_page_faults_node + self.max_page_faults_task = src.max_page_faults_task + + if src.max_resident_memory >= self.max_resident_memory: + self.max_resident_memory = src.max_resident_memory + self.max_resident_memory_node = src.max_resident_memory_node + self.max_resident_memory_task = src.max_resident_memory_task + self.avg_resident_memory = self.max_resident_memory + + if src.max_virtual_memory >= self.max_virtual_memory: + self.max_virtual_memory = src.max_virtual_memory + self.max_virtual_memory_node = src.max_virtual_memory_node + self.max_virtual_memory_task = src.max_virtual_memory_task + self.avg_virtual_memory = self.max_virtual_memory + + if src.min_cpu_time >= self.min_cpu_time: + self.min_cpu_time = src.min_cpu_time + self.min_cpu_time_node = src.min_cpu_time_node + self.min_cpu_time_task = src.min_cpu_time_task + + def _sum_cpu_time(self, Job job): if job.ptr.tot_cpu_sec != slurm.NO_VAL64: - job_stats.total_cpu_time = job.ptr.tot_cpu_sec + self.total_cpu_time += job.ptr.tot_cpu_sec if job.ptr.user_cpu_sec != slurm.NO_VAL64: - job_stats.user_cpu_time = job.ptr.user_cpu_sec + self.user_cpu_time += job.ptr.user_cpu_sec if job.ptr.sys_cpu_sec != slurm.NO_VAL64: - job_stats.system_cpu_time = job.ptr.sys_cpu_sec + self.system_cpu_time += job.ptr.sys_cpu_sec elapsed = job.elapsed_time if job.elapsed_time else 0 cpus = job.cpus if job.cpus else 0 - job_stats.elapsed_cpu_time = elapsed * cpus - - step_count = len(steps) - if step_count: - job_stats.avg_cpu_frequency /= step_count - + self.elapsed_cpu_time += elapsed * cpus