Skip to content

Commit b997755

Browse files
JonaOttojgoodson
authored andcommitted
Wait_finished method for job API (regarding #240) (#242)
* Fix introduced typo in partition information dictionary key. (#241) * Added wait_finished method to job class (#240). * Added test method for wait_finished method of the job class. * Added _load_single_job method to the job class to extract the slurm_load_job functionality. * Updated find_id and wait_finished to use _load_single_job. Co-authored-by: Jonathan Goodson <jonathan.goodson@gmail.com>
1 parent 4e61c8c commit b997755

File tree

2 files changed

+140
-12
lines changed

2 files changed

+140
-12
lines changed

pyslurm/pyslurm.pyx

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1778,35 +1778,55 @@ cdef class job:
17781778

17791779
return retList
17801780

1781-
def find_id(self, jobid):
1782-
"""Retrieve job ID data.
1781+
cdef _load_single_job(self, jobid):
1782+
"""
1783+
Uses slurm_load_job to setup the self._job_ptr for a single job given by the jobid.
1784+
After calling this, the job pointer can be used in other methods
1785+
to operate on the informations of the job.
17831786
1784-
This method accepts both string and integer formats of the jobid. It
1785-
calls slurm_xlate_job_id() to convert the jobid appropriately.
1786-
This works for single jobs and job arrays.
1787+
This method accepts both string and integer formate of the jobid. It
1788+
calls slurm_xlate_job_id to convert the jobid appropriately.
17871789
1788-
:param str jobid: Job id key string to search
1789-
:returns: List of dictionary of values for given job id
1790-
:rtype: `list`
1790+
Raises an value error if the jobid does not correspond to a existing job.
1791+
1792+
:param str jobid: The jobid
1793+
:returns: void
1794+
:rtype: None.
17911795
"""
17921796
cdef:
17931797
int apiError
17941798
int rc
17951799

1800+
# jobid can be given as int or string
17961801
if isinstance(jobid, int) or isinstance(jobid, long):
17971802
jobid = str(jobid).encode("UTF-8")
17981803
else:
17991804
jobid = jobid.encode("UTF-8")
1800-
1805+
# convert jobid appropriately for slurm
18011806
jobid_xlate = slurm.slurm_xlate_job_id(jobid)
1807+
1808+
# load the job which sets the self._job_ptr pointer
18021809
rc = slurm.slurm_load_job(&self._job_ptr, jobid_xlate, self._ShowFlags)
18031810

1804-
if rc == slurm.SLURM_SUCCESS:
1805-
return list(self.get_job_ptr().values())
1806-
else:
1811+
if rc != slurm.SLURM_SUCCESS:
18071812
apiError = slurm.slurm_get_errno()
18081813
raise ValueError(slurm.stringOrNone(slurm.slurm_strerror(apiError), ''), apiError)
18091814

1815+
def find_id(self, jobid):
1816+
"""Retrieve job ID data.
1817+
1818+
This method accepts both string and integer formats of the jobid.
1819+
This works for single jobs and job arrays. It uses the internal
1820+
helper _load_single_job to do slurm_load_job. If the job corresponding
1821+
to the jobid does not exist, a ValueError will be raised.
1822+
1823+
:param str jobid: Job id key string to search
1824+
:returns: List of dictionary of values for given job id
1825+
:rtype: `list`
1826+
"""
1827+
self._load_single_job(jobid)
1828+
return list(self.get_job_ptr().values())
1829+
18101830
def find_user(self, user):
18111831
"""Retrieve a user's job data.
18121832
@@ -2883,6 +2903,38 @@ cdef class job:
28832903
#return "Submitted batch job %s" % job_id
28842904
return job_id
28852905

2906+
def wait_finished(self, jobid):
2907+
"""
2908+
Block until the job given by the jobid finishes.
2909+
This works for single jobs, as well as job arrays.
2910+
:param jobid: The job id of the slurm job.
2911+
To reference a job with job array set, use the first/"master" jobid
2912+
(the same as given by squeue)
2913+
:returns: The exit code of the slurm job.
2914+
:rtype: `int`
2915+
"""
2916+
exit_status = -9999
2917+
complete = False
2918+
while not complete:
2919+
complete = True
2920+
p_time.sleep(5)
2921+
self._load_single_job(jobid)
2922+
for i in range(0, self._job_ptr.record_count):
2923+
self._record = &self._job_ptr.job_array[i]
2924+
if IS_JOB_COMPLETED(self._job_ptr.job_array[i]):
2925+
exit_status_arrayjob = None
2926+
if WIFEXITED(self._record.exit_code):
2927+
exit_status_arrayjob = WEXITSTATUS(self._record.exit_code)
2928+
else:
2929+
exit_status_arrayjob = 1
2930+
# set exit code to the highest of all jobs in job array
2931+
exit_status = max([exit_status, exit_status_arrayjob])
2932+
else:
2933+
# go on with the next interation, unil all jobs in array are completed
2934+
complete = False
2935+
slurm.slurm_free_job_info_msg(self._job_ptr)
2936+
return exit_status
2937+
28862938

28872939
def slurm_pid2jobid(uint32_t JobPID=0):
28882940
"""Get the slurm job id from a process id.

tests/test_job.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,79 @@ def test_job_kill():
110110
# time.sleep(3)
111111
# test_job_search_after = pyslurm.job().find_id(test_job_id)[0]
112112
# assert_equals(test_job_search_after.get("job_state"), "FAILED")
113+
114+
115+
def test_job_wait_finished():
116+
"""Job: Test job().wait_finished()."""
117+
test_job = {
118+
"wrap": "sleep 30",
119+
"job_name": "pyslurm_test_job",
120+
"ntasks": 1,
121+
"cpus_per_task": 1,
122+
}
123+
test_job_id = pyslurm.job().submit_batch_job(test_job)
124+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
125+
126+
# wait for the job to finish
127+
exit_code = pyslurm.job().wait_finished(test_job_id)
128+
129+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
130+
assert start_job_state != "COMPLETED"
131+
assert end_job_state == "COMPLETED"
132+
assert exit_code == 0
133+
134+
# test again with another wrap
135+
test_job = {
136+
"wrap": "sleep 300; exit 1", # "exit 1" should yield failure ending
137+
"job_name": "pyslurm_test_job",
138+
"ntasks": 1,
139+
"cpus_per_task": 1,
140+
}
141+
test_job_id = pyslurm.job().submit_batch_job(test_job)
142+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
143+
144+
# wait for the job to finish
145+
exit_code = pyslurm.job().wait_finished(test_job_id)
146+
147+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
148+
assert start_job_state != "COMPLETED"
149+
assert end_job_state == "FAILED"
150+
assert exit_code == 1
151+
152+
153+
def test_job_wait_finished_w_arrays():
154+
"""Job: Test job().wait_finished() with job arrays."""
155+
test_job = {
156+
"wrap": "sleep 30; exit 0",
157+
"job_name": "pyslurm_array_test_job",
158+
"ntasks": 1,
159+
"cpus_per_task": 1,
160+
"array_inx": "0,1,2",
161+
}
162+
test_job_id = pyslurm.job().submit_batch_job(test_job)
163+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
164+
# wait for the job to finish
165+
exit_code = pyslurm.job().wait_finished(test_job_id)
166+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
167+
assert start_job_state != "COMPLETED"
168+
assert end_job_state == "COMPLETED"
169+
assert exit_code == 0
170+
171+
# test for exit codes: maximum exit code of all array jobs
172+
test_job = {
173+
# use array ID as exit code to yield different exit codes: 0, 1, 2
174+
"wrap": "sleep 30; exit $SLURM_ARRAY_TASK_ID",
175+
"job_name": "pyslurm_array_test_job",
176+
"ntasks": 1,
177+
"cpus_per_task": 1,
178+
"array_inx": "0,1,2",
179+
}
180+
test_job_id = pyslurm.job().submit_batch_job(test_job)
181+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
182+
# wait for the job to finish
183+
exit_code = pyslurm.job().wait_finished(test_job_id)
184+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
185+
assert start_job_state != "COMPLETED"
186+
# exit code 2 (the maximum of all) should yield FAILED for the entire job
187+
assert end_job_state == "FAILED"
188+
assert exit_code == 2

0 commit comments

Comments
 (0)