Skip to content

Commit

Permalink
When number of jobs is larger than the number of available vms, N,
Browse files Browse the repository at this point in the history
jobManager dies after N jobs have finished in the exception handling
of __manage(), if Config.REUSE_VM is set to true.  This commit
simply checks whether "job" is None, to avoid the crash.  This allows
job manager to continue and finish all jobs.
But it may not be the real fix of the root problem which needs further
investigation.
  • Loading branch information
Xiaolin (Charlene) Zang committed Aug 11, 2017
1 parent c7f5664 commit 4dcbbb4
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions jobManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# is launched that will handle things from here on. If anything goes
# wrong, the job is made dead with the error.
#
import threading, logging, time, copy
import threading, logging, time, copy, os

from datetime import datetime
from tango import *
Expand All @@ -27,10 +27,11 @@ def __init__(self, queue):
self.jobQueue = queue
self.preallocator = self.jobQueue.preallocator
self.vmms = self.preallocator.vmms
self.log = logging.getLogger("JobManager")
self.log = logging.getLogger("JobManager-" + str(os.getpid()))
# job-associated instance id
self.nextId = 10000
self.running = False
self.log.info("START jobManager")

def start(self):
if self.running:
Expand Down Expand Up @@ -61,14 +62,24 @@ def __manage(self):
id = self.jobQueue.getNextPendingJob()

if id:
self.log.info("_manage job after getNextPendingJob() %s" % id)

job = self.jobQueue.get(id)
if job is not None:
jobStr = ', '.join("%s: %s" % item for item in job.__dict__.items())
self.log.info("_manage job %s" % jobStr)
if not job.accessKey and Config.REUSE_VMS:
id, vm = self.jobQueue.getNextPendingJobReuse(id)
job = self.jobQueue.get(id)

if job is not None:
jobStr = ', '.join("%s: %s" % item for item in job.__dict__.items())
self.log.info("_manage after getNextPendingJobReuse %s" % jobStr)
else:
self.log.info("_manage after getNextPendingJobReuse %s %s" % (id, vm))
try:
# Mark the job assigned
self.jobQueue.assignJob(job.id)
self.log.info("_manage after assignJob %s" % id)
# if the job has specified an account
# create an VM on the account and run on that instance
if job.accessKeyId:
Expand All @@ -77,13 +88,16 @@ def __manage(self):
newVM = copy.deepcopy(job.vm)
newVM.id = self._getNextID()
preVM = vmms.initializeVM(newVM)
self.log.info("_manage init new vm %s" % preVM.id)
else:
# Try to find a vm on the free list and allocate it to
# the worker if successful.
if Config.REUSE_VMS:
preVM = vm
self.log.info("_manage reuse vm %s" % preVM.id)
else:
preVM = self.preallocator.allocVM(job.vm.name)
self.log.info("_manage allocate vm %s" % preVM.id)
vmms = self.vmms[job.vm.vmms] # Create new vmms object

# Now dispatch the job to a worker
Expand All @@ -102,7 +116,11 @@ def __manage(self):
).start()

except Exception as err:
self.jobQueue.makeDead(job.id, str(err))
if job is not None:
# if True:
self.jobQueue.makeDead(job.id, str(err))
else:
self.log.info("_manage: job is None")

# Sleep for a bit and then check again
time.sleep(Config.DISPATCH_PERIOD)
Expand Down

0 comments on commit 4dcbbb4

Please sign in to comment.