Skip to content

Commit

Permalink
Merge pull request #19 from njzjz/lsf
Browse files Browse the repository at this point in the history
add LSF support and bugfix
  • Loading branch information
amcadmus authored Jul 6, 2019
2 parents 840bf8e + c1b7bd9 commit 6f06104
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 5 deletions.
130 changes: 127 additions & 3 deletions generator/lib/RemoteJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class JobStatus (Enum) :
running = 3
terminated = 4
finished = 5
unknow = 100
unknown = 100

def _default_item(resources, key, value) :
if key not in resources :
Expand Down Expand Up @@ -394,7 +394,7 @@ def _make_script(self,
ret += 'srun %s %s\n' % (cmd, jj)
else :
ret += '%s %s\n' % (cmd, jj)
if 'allow_failure' not in resources or resources['allow_failure'] is not False:
if 'allow_failure' not in res or res['allow_failure'] is False:
ret += 'test $? -ne 0 && exit\n'
ret += 'cd %s\n' % self.remote_root
ret += 'test $? -ne 0 && exit\n'
Expand Down Expand Up @@ -518,7 +518,7 @@ def _make_script(self,
ret += 'mpirun -machinefile $PBS_NODEFILE -n %d %s %s\n' % (res['numb_node'] * res['task_per_node'], cmd, jj)
else :
ret += '%s %s\n' % (cmd, jj)
if 'allow_failure' not in resources or resources['allow_failure'] is not False:
if 'allow_failure' not in res or res['allow_failure'] is False:
ret += 'test $? -ne 0 && exit\n'
ret += 'cd %s\n' % self.remote_root
ret += 'test $? -ne 0 && exit\n'
Expand Down Expand Up @@ -546,3 +546,127 @@ def _make_script(self,
# # can download dirs and normal files
# rjob.download(['job0', 'job1'], ['a'])
# # rjob.clean()


class LSFJob (RemoteJob) :
def submit(self,
job_dirs,
cmd,
args = None,
resources = None) :
script_name = self._make_script(job_dirs, cmd, args, res = resources)
stdin, stdout, stderr = self.block_checkcall(('cd %s; bsub < %s' % (self.remote_root, script_name)))
subret = (stdout.readlines())
job_id = subret[0].split()[1][1:-1]
sftp = self.ssh.open_sftp()
with sftp.open(os.path.join(self.remote_root, 'job_id'), 'w') as fp:
fp.write(job_id)
sftp.close()

def check_status(self) :
job_id = self._get_job_id()
if job_id == "" :
raise RuntimeError("job %s is has not been submitted" % self.remote_root)
ret, stdin, stdout, stderr\
= self.block_call ("bjobs " + job_id)
err_str = stderr.read().decode('utf-8')
if (ret != 0) :
if ("Job <%s> is not found" % job_id) in err_str :
if self._check_finish_tag() :
return JobStatus.finished
else :
return JobStatus.terminated
else :
raise RuntimeError ("status command qstat fails to execute. erro info: %s return code %d"
% (err_str, ret))
status_line = stdout.read().decode('utf-8').split ('\n')[-2]
status_word = status_line.split ()[2]
# ref: https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html
if status_word in ["PEND", "WAIT"] :
return JobStatus.waiting
elif status_word in ["RUN"] :
return JobStatus.running
elif status_word in ["DONE","EXIT"] :
if self._check_finish_tag() :
return JobStatus.finished
else :
return JobStatus.terminated
else :
return JobStatus.unknown

def _get_job_id(self) :
sftp = self.ssh.open_sftp()
with sftp.open(os.path.join(self.remote_root, 'job_id'), 'r') as fp:
ret = fp.read().decode('utf-8')
sftp.close()
return ret

def _check_finish_tag(self) :
sftp = self.ssh.open_sftp()
try:
sftp.stat(os.path.join(self.remote_root, 'tag_finished'))
ret = True
except IOError:
ret = False
sftp.close()
return ret

def _make_script(self,
job_dirs,
cmd,
args = None,
res = None) :
_set_default_resource(res)
ret = ''
ret += "#!/bin/bash -l\n#BSUB -e %J.err\n#BSUB -o %J.out\n"
if res['numb_gpu'] == 0:
ret += '#BSUB -R span[ptile=%d]\n#BSUB -n %d\n' % (res['node_cpu'], res['numb_node'] * res['task_per_node'])
else :
ret += '#BSUB -R "select[ngpus >0] rusage[ngpus_excl_p=1]"\n#BSUB -n %d\n' % (res['numb_gpu'])
#ret += '#BSUB -l walltime=%s\n' % (res['time_limit'])
#if res['mem_limit'] > 0 :
# ret += "#BSUB -l mem=%dG \n" % res['mem_limit']
ret += '#BSUB -J %s\n' % (res['job_name'] if 'job_name' in res else 'dpgen')
if len(res['partition']) > 0 :
ret += '#BSUB -q %s\n' % res['partition']
ret += "\n"
for ii in res['module_unload_list'] :
ret += "module unload %s\n" % ii
for ii in res['module_list'] :
ret += "module load %s\n" % ii
ret += "\n"
for ii in res['source_list'] :
ret += "source %s\n" %ii
ret += "\n"
envs = res['envs']
if envs != None :
for key in envs.keys() :
ret += 'export %s=%s\n' % (key, envs[key])
ret += '\n'
#ret += 'cd $PBS_O_WORKDIR\n\n'

if args == None :
args = []
for ii in job_dirs:
args.append('')
for ii,jj in zip(job_dirs, args) :
ret += 'cd %s\n' % ii
ret += 'test $? -ne 0 && exit\n'
if res['with_mpi'] :
ret += 'mpirun -machinefile $LSB_DJOB_HOSTFILE -n %d %s %s\n' % (res['numb_node'] * res['task_per_node'], cmd, jj)
else :
ret += '%s %s\n' % (cmd, jj)
if 'allow_failure' not in res or res['allow_failure'] is False:
ret += 'test $? -ne 0 && exit\n'
ret += 'cd %s\n' % self.remote_root
ret += 'test $? -ne 0 && exit\n'
ret += '\ntouch tag_finished\n'

script_name = 'run.sub'
script = os.path.join(self.remote_root, script_name)
sftp = self.ssh.open_sftp()
with sftp.open(script, 'w') as fp :
fp.write(ret)
sftp.close()

return script_name
37 changes: 35 additions & 2 deletions generator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from lib.pwscf import make_pwscf_input
from lib.pwscf import cvt_1frame
from lib.gaussian import make_gaussian_input
from lib.RemoteJob import SSHSession, JobStatus, SlurmJob, PBSJob, CloudMachineJob
from lib.RemoteJob import SSHSession, JobStatus, SlurmJob, PBSJob, LSFJob, CloudMachineJob

template_name = 'template'
train_name = '00.train'
Expand Down Expand Up @@ -556,7 +556,7 @@ def run_train (iter_index,
trans_comm_data,
forward_files,
backward_files)
elif machine_type == 'pbs' :
elif machine_type == 'pbs':
_group_slurm_jobs(ssh_sess,
train_resources,
command,
Expand All @@ -567,6 +567,17 @@ def run_train (iter_index,
forward_files,
backward_files,
remote_job = PBSJob)
elif machine_type == 'lsf':
_group_slurm_jobs(ssh_sess,
train_resources,
command,
work_path,
run_tasks,
1,
trans_comm_data,
forward_files,
backward_files,
remote_job = LSFJob)
elif machine_type == 'local' :
_group_local_jobs(ssh_sess,
train_resources,
Expand Down Expand Up @@ -844,6 +855,17 @@ def run_model_devi (iter_index,
forward_files,
backward_files,
remote_job = PBSJob)
elif machine_type == 'lsf' :
_group_slurm_jobs(ssh_sess,
model_devi_resources,
command,
work_path,
run_tasks,
model_devi_group_size,
model_names,
forward_files,
backward_files,
remote_job = LSFJob)
elif machine_type == 'local' :
_group_local_jobs(ssh_sess,
model_devi_resources,
Expand Down Expand Up @@ -1268,6 +1290,17 @@ def run_fp_inner (iter_index,
forward_files,
backward_files,
remote_job = PBSJob)
elif machine_type == 'lsf' :
_group_slurm_jobs(ssh_sess,
fp_resources,
fp_command,
work_path,
run_tasks,
fp_group_size,
[],
forward_files,
backward_files,
remote_job = LSFJob)
elif machine_type == 'local' :
_group_local_jobs(ssh_sess,
fp_resources,
Expand Down

0 comments on commit 6f06104

Please sign in to comment.