From 408cdb38702df75c11d593822d4582483623b10f Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 5 Nov 2021 15:42:33 -0500 Subject: [PATCH 1/3] Update Worker._executing handling --- distributed/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 8164d93c44..50c3f177e5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2060,6 +2060,7 @@ def transition_waiting_ready(self, ts, *, stimulus_id): def transition_generic_error( self, ts, exception, traceback, exception_text, traceback_text, *, stimulus_id ): + self._executing.discard(ts) ts.exception = exception ts.traceback = traceback ts.exception_text = exception_text @@ -2073,7 +2074,6 @@ def transition_executing_error( ): for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity - self._executing.discard(ts) return self.transition_generic_error( ts, exception, From cd6f66499267ca761c49c7470e66a27053879deb Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 5 Nov 2021 16:36:56 -0500 Subject: [PATCH 2/3] One more --- distributed/worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/worker.py b/distributed/worker.py index 50c3f177e5..a1fc018381 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2135,6 +2135,7 @@ def transition_cancelled_released(self, ts, *, stimulus_id): return recommendations, [] def transition_executing_released(self, ts, *, stimulus_id): + self._executing.discard(ts) ts._previous = ts.state # See https://github.com/dask/distributed/pull/5046#discussion_r685093940 ts.state = "cancelled" From 1f3c619fed7069ea10ba4e030691b5206ec40b38 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 5 Nov 2021 17:07:52 -0500 Subject: [PATCH 3/3] executing -> released goes through intermediate cancelled transision --- distributed/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index a1fc018381..50c3f177e5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2135,7 +2135,6 @@ def transition_cancelled_released(self, ts, *, stimulus_id): return recommendations, [] def transition_executing_released(self, ts, *, stimulus_id): - self._executing.discard(ts) ts._previous = ts.state # See https://github.com/dask/distributed/pull/5046#discussion_r685093940 ts.state = "cancelled"