diff --git a/pyslurm/pyslurm.pyx b/pyslurm/pyslurm.pyx index f6bad7cc..405a1bcf 100644 --- a/pyslurm/pyslurm.pyx +++ b/pyslurm/pyslurm.pyx @@ -602,7 +602,7 @@ cdef class config: Ctl_dict['cpu_freq_govs'] = self.__Config_ptr.cpu_freq_govs Ctl_dict['cred_type'] = slurm.stringOrNone(self.__Config_ptr.cred_type, '') Ctl_dict['debug_flags'] = self.__Config_ptr.debug_flags - Ctl_dict['def_mem_per_cp'] = self.__Config_ptr.def_mem_per_cpu + Ctl_dict['def_mem_per_cpu'] = self.__Config_ptr.def_mem_per_cpu Ctl_dict['dependency_params'] = slurm.stringOrNone(self.__Config_ptr.dependency_params, '') Ctl_dict['eio_timeout'] = self.__Config_ptr.eio_timeout Ctl_dict['enforce_part_limits'] = bool(self.__Config_ptr.enforce_part_limits) @@ -1027,16 +1027,16 @@ cdef class partition: if record.def_mem_per_cpu & slurm.MEM_PER_CPU: if record.def_mem_per_cpu == slurm.MEM_PER_CPU: - Part_dict['def_mem_per_cp'] = "UNLIMITED" + Part_dict['def_mem_per_cpu'] = "UNLIMITED" Part_dict['def_mem_per_node'] = None else: - Part_dict['def_mem_per_cp'] = record.def_mem_per_cpu & (~slurm.MEM_PER_CPU) + Part_dict['def_mem_per_cpu'] = record.def_mem_per_cpu & (~slurm.MEM_PER_CPU) Part_dict['def_mem_per_node'] = None elif record.def_mem_per_cpu == 0: - Part_dict['def_mem_per_cp'] = None + Part_dict['def_mem_per_cpu'] = None Part_dict['def_mem_per_node'] = "UNLIMITED" else: - Part_dict['def_mem_per_cp'] = None + Part_dict['def_mem_per_cpu'] = None Part_dict['def_mem_per_node'] = record.def_mem_per_cpu if record.default_time == slurm.INFINITE: @@ -1778,35 +1778,55 @@ cdef class job: return retList - def find_id(self, jobid): - """Retrieve job ID data. + cdef _load_single_job(self, jobid): + """ + Uses slurm_load_job to setup the self._job_ptr for a single job given by the jobid. + After calling this, the job pointer can be used in other methods + to operate on the informations of the job. - This method accepts both string and integer formats of the jobid. It - calls slurm_xlate_job_id() to convert the jobid appropriately. - This works for single jobs and job arrays. + This method accepts both string and integer formate of the jobid. It + calls slurm_xlate_job_id to convert the jobid appropriately. - :param str jobid: Job id key string to search - :returns: List of dictionary of values for given job id - :rtype: `list` + Raises an value error if the jobid does not correspond to a existing job. + + :param str jobid: The jobid + :returns: void + :rtype: None. """ cdef: int apiError int rc + # jobid can be given as int or string if isinstance(jobid, int) or isinstance(jobid, long): jobid = str(jobid).encode("UTF-8") else: jobid = jobid.encode("UTF-8") - + # convert jobid appropriately for slurm jobid_xlate = slurm.slurm_xlate_job_id(jobid) + + # load the job which sets the self._job_ptr pointer rc = slurm.slurm_load_job(&self._job_ptr, jobid_xlate, self._ShowFlags) - if rc == slurm.SLURM_SUCCESS: - return list(self.get_job_ptr().values()) - else: + if rc != slurm.SLURM_SUCCESS: apiError = slurm.slurm_get_errno() raise ValueError(slurm.stringOrNone(slurm.slurm_strerror(apiError), ''), apiError) + def find_id(self, jobid): + """Retrieve job ID data. + + This method accepts both string and integer formats of the jobid. + This works for single jobs and job arrays. It uses the internal + helper _load_single_job to do slurm_load_job. If the job corresponding + to the jobid does not exist, a ValueError will be raised. + + :param str jobid: Job id key string to search + :returns: List of dictionary of values for given job id + :rtype: `list` + """ + self._load_single_job(jobid) + return list(self.get_job_ptr().values()) + def find_user(self, user): """Retrieve a user's job data. @@ -2883,6 +2903,38 @@ cdef class job: #return "Submitted batch job %s" % job_id return job_id + def wait_finished(self, jobid): + """ + Block until the job given by the jobid finishes. + This works for single jobs, as well as job arrays. + :param jobid: The job id of the slurm job. + To reference a job with job array set, use the first/"master" jobid + (the same as given by squeue) + :returns: The exit code of the slurm job. + :rtype: `int` + """ + exit_status = -9999 + complete = False + while not complete: + complete = True + p_time.sleep(5) + self._load_single_job(jobid) + for i in range(0, self._job_ptr.record_count): + self._record = &self._job_ptr.job_array[i] + if IS_JOB_COMPLETED(self._job_ptr.job_array[i]): + exit_status_arrayjob = None + if WIFEXITED(self._record.exit_code): + exit_status_arrayjob = WEXITSTATUS(self._record.exit_code) + else: + exit_status_arrayjob = 1 + # set exit code to the highest of all jobs in job array + exit_status = max([exit_status, exit_status_arrayjob]) + else: + # go on with the next interation, unil all jobs in array are completed + complete = False + slurm.slurm_free_job_info_msg(self._job_ptr) + return exit_status + def slurm_pid2jobid(uint32_t JobPID=0): """Get the slurm job id from a process id. diff --git a/tests/test_job.py b/tests/test_job.py index efb16c09..e11bb65c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -110,3 +110,79 @@ def test_job_kill(): # time.sleep(3) # test_job_search_after = pyslurm.job().find_id(test_job_id)[0] # assert_equals(test_job_search_after.get("job_state"), "FAILED") + + +def test_job_wait_finished(): + """Job: Test job().wait_finished().""" + test_job = { + "wrap": "sleep 30", + "job_name": "pyslurm_test_job", + "ntasks": 1, + "cpus_per_task": 1, + } + test_job_id = pyslurm.job().submit_batch_job(test_job) + start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + + # wait for the job to finish + exit_code = pyslurm.job().wait_finished(test_job_id) + + end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + assert start_job_state != "COMPLETED" + assert end_job_state == "COMPLETED" + assert exit_code == 0 + + # test again with another wrap + test_job = { + "wrap": "sleep 300; exit 1", # "exit 1" should yield failure ending + "job_name": "pyslurm_test_job", + "ntasks": 1, + "cpus_per_task": 1, + } + test_job_id = pyslurm.job().submit_batch_job(test_job) + start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + + # wait for the job to finish + exit_code = pyslurm.job().wait_finished(test_job_id) + + end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + assert start_job_state != "COMPLETED" + assert end_job_state == "FAILED" + assert exit_code == 1 + + +def test_job_wait_finished_w_arrays(): + """Job: Test job().wait_finished() with job arrays.""" + test_job = { + "wrap": "sleep 30; exit 0", + "job_name": "pyslurm_array_test_job", + "ntasks": 1, + "cpus_per_task": 1, + "array_inx": "0,1,2", + } + test_job_id = pyslurm.job().submit_batch_job(test_job) + start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + # wait for the job to finish + exit_code = pyslurm.job().wait_finished(test_job_id) + end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + assert start_job_state != "COMPLETED" + assert end_job_state == "COMPLETED" + assert exit_code == 0 + + # test for exit codes: maximum exit code of all array jobs + test_job = { + # use array ID as exit code to yield different exit codes: 0, 1, 2 + "wrap": "sleep 30; exit $SLURM_ARRAY_TASK_ID", + "job_name": "pyslurm_array_test_job", + "ntasks": 1, + "cpus_per_task": 1, + "array_inx": "0,1,2", + } + test_job_id = pyslurm.job().submit_batch_job(test_job) + start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + # wait for the job to finish + exit_code = pyslurm.job().wait_finished(test_job_id) + end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"] + assert start_job_state != "COMPLETED" + # exit code 2 (the maximum of all) should yield FAILED for the entire job + assert end_job_state == "FAILED" + assert exit_code == 2