diff --git a/spark/worker.py b/spark/worker.py index 51816b1..6e494a9 100644 --- a/spark/worker.py +++ b/spark/worker.py @@ -109,7 +109,7 @@ def _run_job(self, job): import traceback tb = traceback.format_exc() - jlog.write(tb) + log.write(tb) self._conn.send_job_status(job_id, JobStatus.REJECTED) log.warning(tb) log.info('Rejected job {}'.format(job_id)) @@ -184,19 +184,32 @@ def _request_job(self): # there are no jobs available for us return False - job_module = job_reply.get('module') - job_kind = job_reply.get('kind') - job_id = job_reply.get('uuid') - - if job_kind in self._conf.accepted_job_kinds: - return self._run_job(job_reply) - else: - log.warning( - 'Received job of type {0}::{1} which we can not handle.'.format( - job_module, job_kind + # Now that we've accepted a job, we just reply to the server, even if + # an exception occurs. If we don't, the job is stuck indefinitely and + # we will not be able to accept another. + try: + job_module = job_reply.get('module') + job_kind = job_reply.get('kind') + job_id = job_reply.get('uuid') + + if job_kind in self._conf.accepted_job_kinds: + return self._run_job(job_reply) + else: + log.warning( + 'Received job of type {0}::{1} which we can not handle.'.format( + job_module, job_kind + ) ) - ) + self._conn.send_job_status(job_id, JobStatus.REJECTED) + return False + except: # noqa: E722 pylint: disable=bare-except + import traceback + + tb = traceback.format_exc() + jlog.write(tb) self._conn.send_job_status(job_id, JobStatus.REJECTED) + log.warning(tb) + log.info('Rejected job {} due to exception'.format(job_id)) return False def _update_archive_data(self) -> bool: