diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 1d71fa71..fe5b13b4 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -57,8 +57,16 @@ def add_worker(self, scheduler, worker=None, name=None, **kwargs): # if this is the first worker for this job, move job to running if job_id not in self.running_jobs: - logger.debug("this is a new job") - self.running_jobs[job_id] = self.pending_jobs.pop(job_id) + logger.debug("this is a new job or restarting worker") + if job_id in self.pending_jobs: + self.running_jobs[job_id] = self.pending_jobs.pop(job_id) + elif job_id in self.finished_jobs: + logger.warning('Worker %s restart in Job %s. ' + 'This can be due to memory issue.', w, job_id) + self.running_jobs[job_id] = self.finished_jobs.pop(job_id) + else: + logger.error('Unknown job_id: %s for worker %s', job_id, w) + self.running_jobs[job_id] = {} # add worker to dict of workers in this job self.running_jobs[job_id][w.name] = w