Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement to scheduler logic and code refactoring. #1739

Merged
merged 10 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildtest/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def gather(self, builder):

builder.record_endtime()

builder.metadata["job"] = builder.job.gather()
builder.metadata["job"] = builder.job.jobdata()
builder.metadata["result"]["returncode"] = builder.job.exitcode()

self.logger.debug(
Expand Down
26 changes: 12 additions & 14 deletions buildtest/executors/cobalt.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ def run(self, builder):
logger.debug(f"Output file will be written to: {builder.metadata['outfile']}")
logger.debug(f"Error file will be written to: {builder.metadata['errfile']}")

builder.metadata["job"] = builder.job.gather()
# gather job record
builder.job.retrieve_jobdata()
builder.metadata["job"] = builder.job.jobdata()
logger.debug(json.dumps(builder.metadata["job"], indent=2))

return builder
Expand All @@ -129,20 +131,16 @@ def poll(self, builder):
return

builder.stop()
# if job is pending or suspended check if builder timer duration exceeds maxpendtime if so cancel job
if builder.job.is_pending() or builder.job.is_suspended():
logger.debug(f"Time Duration: {builder.duration}")
logger.debug(f"Max Pend Time: {self.maxpendtime}")

# if timer time is more than requested pend time then cancel job
if int(builder.timer.duration()) > self.maxpendtime:
builder.job.cancel()
builder.failed()
console.print(
f"[blue]{builder}[/]: [red]Cancelling Job {builder.job.get()} because job exceeds max pend time of {self.maxpendtime} sec with current pend time of {builder.timer.duration()} sec[/red] "
)
return

if builder.job.is_running():
builder.job.elapsedtime = time.time() - builder.job.starttime
builder.job.elapsedtime = round(builder.job.elapsedtime, 2)
if self._cancel_job_if_elapsedtime_exceeds_timeout(builder):
return

if builder.job.is_suspended() or builder.job.is_pending():
if self._cancel_job_if_pendtime_exceeds_maxpendtime(builder):
return
builder.start()

def gather(self, builder):
Expand Down
12 changes: 5 additions & 7 deletions buildtest/executors/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def run(self, builder):

msg = f"[blue]{builder}[/blue]: JobID {builder.metadata['jobid']} dispatched to scheduler"
console.print(msg)

builder.job.get_output_and_error_files()
self.logger.debug(msg)

return builder
Expand All @@ -120,20 +122,16 @@ def gather(self, builder):
"""
builder.record_endtime()

builder.metadata["job"] = builder.job.gather()
builder.metadata["job"] = builder.job.jobdata()

builder.metadata["result"]["returncode"] = builder.job.exitcode()

self.logger.debug(
f"[{builder.name}] returncode: {builder.metadata['result']['returncode']}"
)

builder.metadata["outfile"] = os.path.join(
builder.job.workdir(), builder.name + ".out"
)
builder.metadata["errfile"] = os.path.join(
builder.job.workdir(), builder.name + ".err"
)
builder.metadata["outfile"] = builder.job.output_file()
builder.metadata["errfile"] = builder.job.error_file()

console.print(f"[blue]{builder}[/]: Job {builder.job.get()} is complete! ")
builder.post_run_steps()
25 changes: 7 additions & 18 deletions buildtest/scheduler/cobalt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

from buildtest.scheduler.job import Job
from buildtest.utils.command import BuildTestCommand
Expand Down Expand Up @@ -55,21 +56,6 @@ def cobalt_log(self):

return self._cobaltlog

def output_file(self):
"""Return job output file"""

return self._outfile

def error_file(self):
"""Return job error file"""

return self._errfile

def exitcode(self):
"""Return job exit code"""

return self._exitcode

def poll(self):
"""Poll job by running ``qstat -l --header State <jobid>`` which retrieves job state."""

Expand All @@ -88,9 +74,12 @@ def poll(self):
if job_state:
self._state = job_state

logger.debug(f"Job ID: '{self.job}' Job State: {self._state}")
logger.debug(f"Job ID: '{self.jobid}' Job State: {self._state}")

if self.is_running() and not self.starttime:
self.starttime = time.time()

def gather(self):
def retrieve_jobdata(self):
"""Gather Job state by running **qstat -lf <jobid>** which retrieves all fields.
The output is in text format which is parsed into key/value pair and stored in a dictionary. This method will
return a dict containing the job record
Expand Down Expand Up @@ -123,7 +112,7 @@ def gather(self):
value = value.strip()
job_record[key] = value

return job_record
self._jobdata = job_record

def cancel(self):
"""Cancel job by running ``qdel <jobid>``. This method is called if job timer exceeds
Expand Down
26 changes: 26 additions & 0 deletions buildtest/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class Job:
def __init__(self, jobID):
self.jobid = jobID
self._state = None
self._outfile = None
self._errfile = None
self._exitcode = None
self._jobdata = None
# used to store the job elapsed time
self.elapsedtime = 0

Expand Down Expand Up @@ -47,3 +51,25 @@ def cancel(self):
def poll(self):
"""Poll job and update job state."""
raise NotImplementedError

def get_output_and_error_files(self):
"""Get output and error of job"""
raise NotImplementedError

def output_file(self):
"""Return output file of job"""
return self._outfile

def error_file(self):
"""Return error file of job"""
return self._errfile

def exitcode(self):
"""Return exit code of job"""
return self._exitcode

def retrieve_jobdata(self):
raise NotImplementedError

def jobdata(self):
return self._jobdata
99 changes: 34 additions & 65 deletions buildtest/scheduler/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,6 @@ def is_failed(self):

return self._state == "EXIT"

def output_file(self):
"""Return job output file"""

return self._outfile

def error_file(self):
"""Return job error file"""

return self._errfile

def exitcode(self):
"""Return job exit code"""

return self._exitcode

def poll(self):
"""Given a job id we poll the LSF Job by retrieving its job state, output file, error file and exit code.
We run the following commands to retrieve following states
Expand Down Expand Up @@ -94,40 +79,42 @@ def poll(self):
if self.is_running() and not self.starttime:
self.starttime = time.time()

def gather(self):
"""This method will retrieve the output and error file for a given jobID using the following commands.
def get_output_and_error_files(self):
"""This method will extract output and error file for a given jobID by running the following commands:
``bjobs -noheader -o 'output_file' <JOBID>`` and ``bjobs -noheader -o 'error_file' <JOBID>``

.. code-block:: console
.. code-block:: console

$ bjobs -noheader -o 'output_file' 70910
hold_job.out
$ bjobs -noheader -o 'output_file' 70910
hold_job.out

.. code-block:: console
.. code-block:: console

$ bjobs -noheader -o 'error_file' 70910
hold_job.err
"""
# get path to output file
query = f"bjobs -noheader -o 'output_file' {self.jobid} "
logger.debug(
f"Extracting OUTPUT FILE for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
self._outfile = "".join(cmd.get_output()).rstrip()
logger.debug(f"Output File: {self._outfile}")

# get path to error file
query = f"bjobs -noheader -o 'error_file' {self.jobid} "
logger.debug(
f"Extracting ERROR FILE for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
self._errfile = "".join(cmd.get_output()).rstrip()
logger.debug(f"Error File: {self._errfile}")

$ bjobs -noheader -o 'error_file' 70910
hold_job.err

We will gather job record at onset of job completion by running ``bjobs -o '<format1> <format2>' <jobid> -json``. The format
fields extracted from job are the following:

- "job_name"
- "stat"
- "user"
- "user_group"
- "queue"
- "proj_name"
- "pids"
- "exit_code"
- "from_host"
- "exec_host"
- "submit_time"
- "start_time"
- "finish_time"
- "nthreads"
- "exec_home"
- "exec_cwd"
- "output_file"
- "error_file"
def retrieve_jobdata(self):
"""We will gather job record at onset of job completion by running ``bjobs -o '<format1> <format2>' <jobid> -json``. T

Shown below is the output format and we retrieve the job records defined in **RECORDS** property

Expand Down Expand Up @@ -162,25 +149,7 @@ def gather(self):
}
"""

# get path to output file
query = f"bjobs -noheader -o 'output_file' {self.jobid} "
logger.debug(
f"Extracting OUTPUT FILE for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
self._outfile = "".join(cmd.get_output()).rstrip()
logger.debug(f"Output File: {self._outfile}")

# get path to error file
query = f"bjobs -noheader -o 'error_file' {self.jobid} "
logger.debug(
f"Extracting ERROR FILE for job: {self.jobid} by running '{query}'"
)
cmd = BuildTestCommand(query)
cmd.execute()
self._errfile = "".join(cmd.get_output()).rstrip()
logger.debug(f"Error File: {self._errfile}")
self.get_output_and_error_files()

format_fields = [
"job_name",
Expand Down Expand Up @@ -219,7 +188,7 @@ def gather(self):
for field, value in records.items():
job_data[field] = value

return job_data
self._jobdata = job_data

def cancel(self):
"""Cancel LSF Job by running ``bkill <jobid>``. This method is called if job pending time exceeds
Expand Down
21 changes: 4 additions & 17 deletions buildtest/scheduler/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ def is_suspended(self):
"""Return ``True`` if job is suspended which would be in one of these states ``H``, ``U``, ``S``."""
return self._state in ["H", "U", "S"]

def output_file(self):
"""Return output file of job"""
return self._outfile

def error_file(self):
"""Return error file of job"""
return self._errfile

def exitcode(self):
"""Return exit code of job"""
return self._exitcode

def success(self):
"""This method determines if job was completed successfully and returns ``True`` if exit code is 0.

Expand All @@ -64,7 +52,7 @@ def fail(self):
"""Return ``True`` if their is a job failure which would be if exit code is not 0"""
return not self.success()

def fetch_output_error_files(self):
def get_output_error_files(self):
"""Fetch output and error files right after job submission."""
query = f"qstat -f {self.jobid}"
cmd = BuildTestCommand(query)
Expand Down Expand Up @@ -96,7 +84,7 @@ def fetch_output_error_files(self):
def is_output_ready(self):
"""Check if the output and error file exists."""
if not self._outfile or not self._errfile:
self.fetch_output_error_files()
self.get_output_error_files()
return os.path.exists(self._outfile) and os.path.exists(self._errfile)

def poll(self):
Expand All @@ -106,8 +94,7 @@ def poll(self):

.. code-block:: console


(buildtest) adaptive50@e4spro-cluster:~/Documents/buildtest/aws_oddc$ qstat -f 40680075.e4spro-cluster
(buildtest) adaptive50@e4spro-cluster:~/Documents/buildtest/aws_oddc$ qstat -f 40680075.e4spro-cluster
Job Id: 40680075.e4spro-cluster
Job_Name = hostname_test
Job_Owner = adaptive50@server.nodus.com
Expand Down Expand Up @@ -259,7 +246,7 @@ def poll(self):
if self.is_running() and not self.starttime:
self.starttime = time.time()

def gather(self):
def retrieve_jobdata(self):
"""This method is called once job is complete. We will gather record of job by running
``qstat -x -f -F json <jobid>`` and return the json object as a dict. This method is responsible
for getting output file, error file and exit status of job.
Expand Down
Loading
Loading