Skip to content

Commit

Permalink
Merge pull request deepmodeling#249 from AnguseZhang/devel
Browse files Browse the repository at this point in the history
restart shell by uuid
  • Loading branch information
amcadmus authored Mar 21, 2020
2 parents db1bbbf + 631845b commit 903d5a5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 21 deletions.
5 changes: 4 additions & 1 deletion dpgen/dispatcher/Batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
class Batch(object) :
def __init__ (self,
context,
uuid_names = False) :
uuid_names = True) :
self.context = context
self.uuid_names = uuid_names
if uuid_names:
self.upload_tag_name = '%s_tag_upload' % self.context.job_uuid
self.finish_tag_name = '%s_tag_finished' % self.context.job_uuid
self.sub_script_name = '%s.sub' % self.context.job_uuid
self.job_id_name = '%s_job_id' % self.context.job_uuid
else:
self.upload_tag_name = 'tag_upload'
self.finish_tag_name = 'tag_finished'
self.sub_script_name = 'run.sub'
self.job_id_name = 'job_id'
Expand Down
23 changes: 11 additions & 12 deletions dpgen/dispatcher/Dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ def __init__ (self,
batch_type = 'slurm',
job_record = 'jr.json'):
self.remote_profile = remote_profile

if context_type == 'local':
self.session = LocalSession(remote_profile)
self.context = LocalContext
self.uuid_names = False
self.uuid_names = True
elif context_type == 'lazy-local':
self.session = None
self.context = LazyLocalContext
self.uuid_names = True
elif context_type == 'ssh':
self.session = SSHSession(remote_profile)
self.context = SSHContext
self.uuid_names = False
self.uuid_names = True
else :
raise RuntimeError('unknown context')
if batch_type == 'slurm':
Expand Down Expand Up @@ -134,14 +135,14 @@ def submit_jobs(self,
batch = self.batch(context, uuid_names = self.uuid_names)
rjob = {'context':context, 'batch':batch}
# upload files
tag_upload = '%s_tag_upload' % rjob['context'].job_uuid
if not rjob['context'].check_file_exists(tag_upload):
if not rjob['context'].check_file_exists(rjob['batch'].upload_tag_name):
rjob['context'].upload('.',
forward_common_files)
rjob['context'].upload(cur_chunk,
forward_task_files,
dereference = forward_task_deference)
rjob['context'].write_file(tag_upload, '')

rjob['context'].write_file(rjob['batch'].upload_tag_name, '')
dlog.debug('uploaded files for %s' % task_chunks_str[ii])
# submit new or recover old submission
if not submitted:
Expand Down Expand Up @@ -309,20 +310,18 @@ def make_dispatcher(mdata, mdata_resource=None, work_path=None, run_tasks=None,
dispatcher.init(work_path, run_tasks, group_size)
return dispatcher
else:
try:
hostname = mdata['hostname']
hostname = mdata.get('hostname', None)
#use_uuid = mdata.get('use_uuid', False)
if hostname:
context_type = 'ssh'
except:
else:
context_type = 'local'
try:
batch_type = mdata['batch']
except:
dlog.info('cannot find key "batch" in machine file, try to use deprecated key "machine_type"')
batch_type = mdata['machine_type']
try:
lazy_local = mdata['lazy_local']
except:
lazy_local = False
lazy_local = (mdata.get('lazy-local', False)) or (mdata.get('lazy_local', False))
if lazy_local and context_type == 'local':
dlog.info('Dispatcher switches to the lazy local mode')
context_type = 'lazy-local'
Expand Down
3 changes: 2 additions & 1 deletion dpgen/dispatcher/SSHContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ def call(self, cmd):

def check_finish(self, cmd_pipes):
return cmd_pipes['stdout'].channel.exit_status_ready()



def get_return(self, cmd_pipes):
if not self.check_finish(cmd_pipes):
return None, None, None
Expand Down
23 changes: 16 additions & 7 deletions dpgen/dispatcher/Shell.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os,getpass,time
from dpgen.dispatcher.Batch import Batch
from dpgen.dispatcher.JobStatus import JobStatus
import datetime

def _default_item(resources, key, value) :
if key not in resources :
Expand All @@ -10,14 +11,13 @@ def _default_item(resources, key, value) :
class Shell(Batch) :

def check_status(self) :
if not hasattr(self, 'proc'):
return JobStatus.unsubmitted
if not self.context.check_finish(self.proc) :
return JobStatus.running
elif (self.context.get_return(self.proc))[0] == 0 :
if self.check_finish_tag():
return JobStatus.finished
else :
elif self.check_running():
return JobStatus.running
else:
return JobStatus.terminated
## warn: cannont distinguish terminated from unsubmitted.

def do_submit(self,
job_dirs,
Expand All @@ -32,6 +32,16 @@ def do_submit(self,
self.context.write_file(self.sub_script_name, script_str)
self.proc = self.context.call('cd %s && exec bash %s' % (self.context.remote_root, self.sub_script_name))

def check_running(self):
uuid_names = self.context.job_uuid
## Check if the uuid.sub is running on remote machine
cnt = 0
ret, stdin, stdout, stderr = self.context.block_call("ps aux | grep %s"%uuid_names)
response_list = stdout.read().decode('utf-8').split("\n")
for response in response_list:
if uuid_names + ".sub" in response:
return True
return False

def default_resources(self, res_) :
if res_ == None :
Expand Down Expand Up @@ -100,4 +110,3 @@ def sub_script_cmd(self,
else :
_cmd = '%s %s' % (_cmd, arg)
return _cmd

0 comments on commit 903d5a5

Please sign in to comment.