From db175b47bd682b368d3788f51ace19759ccb330f Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Fri, 24 Aug 2018 22:32:03 +0200 Subject: [PATCH 1/2] Search for job_id in finished_jobs, or even create new one --- dask_jobqueue/core.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 1d71fa71..4784c9dc 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -57,8 +57,15 @@ def add_worker(self, scheduler, worker=None, name=None, **kwargs): # if this is the first worker for this job, move job to running if job_id not in self.running_jobs: - logger.debug("this is a new job") - self.running_jobs[job_id] = self.pending_jobs.pop(job_id) + logger.debug("this is a new job or restarting worker") + if job_id in self.pending_jobs: + self.running_jobs[job_id] = self.pending_jobs.pop(job_id) + elif job_id in self.finished_jobs: + logger.warning('Job %s has got a restarting worker %s', job_id, w) + self.running_jobs[job_id] = self.finished_jobs.pop(job_id) + else: + logger.error('Unknown job_id: %s for worker %s', job_id, w) + self.running_jobs[job_id] = {} # add worker to dict of workers in this job self.running_jobs[job_id][w.name] = w From 19551d1beb4372cf1ce1e1a0a73de3a2f793f1ed Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 2 Oct 2018 22:43:19 +0200 Subject: [PATCH 2/2] warning about potential memory issue --- dask_jobqueue/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 4784c9dc..fe5b13b4 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -61,7 +61,8 @@ def add_worker(self, scheduler, worker=None, name=None, **kwargs): if job_id in self.pending_jobs: self.running_jobs[job_id] = self.pending_jobs.pop(job_id) elif job_id in self.finished_jobs: - logger.warning('Job %s has got a restarting worker %s', job_id, w) + logger.warning('Worker %s restart in Job %s. ' + 'This can be due to memory issue.', w, job_id) self.running_jobs[job_id] = self.finished_jobs.pop(job_id) else: logger.error('Unknown job_id: %s for worker %s', job_id, w)