Skip to content

Commit

Permalink
Remove is_python_shutting_down (#8492)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Jul 1, 2024
1 parent 50700f3 commit 147c505
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 49 deletions.
21 changes: 0 additions & 21 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,6 @@ def __getattr__(name):
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


_python_shutting_down = False


@atexit.register
def _():
"""Set a global when Python shuts down.
Note
----
This function must be registered with atexit *after* any class that invokes
``distributed.utils.is_python_shutting_down`` has been defined. This way it
will be called before the ``__del__`` method of those classes.
See Also
--------
distributed.utils.is_python_shutting_down
"""
global _python_shutting_down
_python_shutting_down = True


__all__ = [
"Actor",
"ActorFuture",
Expand Down
11 changes: 7 additions & 4 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
format_dashboard_link,
has_keyword,
import_term,
is_python_shutting_down,
log_errors,
nbytes,
sync,
Expand Down Expand Up @@ -272,6 +271,8 @@ class Future(WrappedKey):
Client: Creates futures
"""

_is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing)

_cb_executor = None
_cb_executor_pid = None
_counter = itertools.count()
Expand Down Expand Up @@ -586,7 +587,7 @@ def __del__(self):
except AttributeError:
# Occasionally we see this error when shutting down the client
# https://github.com/dask/distributed/issues/4305
if not is_python_shutting_down():
if not self._is_finalizing():
raise
except RuntimeError: # closed event loop
pass
Expand Down Expand Up @@ -900,6 +901,8 @@ class Client(SyncMethodMixin):
distributed.LocalCluster:
"""

_is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing)

_instances: ClassVar[weakref.WeakSet[Client]] = weakref.WeakSet()

_default_event_handlers = {"print": _handle_print, "warn": _handle_warn}
Expand Down Expand Up @@ -1628,7 +1631,7 @@ async def _handle_report(self):
try:
msgs = await self.scheduler_comm.comm.read()
except CommClosedError:
if is_python_shutting_down():
if self._is_finalizing():
return
if self.status == "running":
if self.cluster and self.cluster.status in (
Expand Down Expand Up @@ -1852,7 +1855,7 @@ def close(self, timeout=no_default):
sync(self.loop, self._close, fast=True, callback_timeout=timeout)
assert self.status == "closed"

if not is_python_shutting_down():
if not self._is_finalizing():
self._loop_runner.stop()

async def _shutdown(self):
Expand Down
11 changes: 8 additions & 3 deletions distributed/comm/inproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import itertools
import logging
import os
import sys
import threading
import weakref
from collections import deque, namedtuple
Expand All @@ -14,7 +15,7 @@
from distributed.comm.core import BaseListener, Comm, CommClosedError, Connector
from distributed.comm.registry import Backend, backends
from distributed.protocol.serialize import _nested_deserialize
from distributed.utils import get_ip, is_python_shutting_down
from distributed.utils import get_ip

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -187,9 +188,13 @@ def _get_finalizer(self):
r = repr(self)

def finalize(
read_q=self._read_q, write_q=self._write_q, write_loop=self._write_loop, r=r
read_q=self._read_q,
write_q=self._write_q,
write_loop=self._write_loop,
is_finalizing=sys.is_finalizing,
r=r,
):
if read_q.peek(None) is _EOF or is_python_shutting_down():
if read_q.peek(None) is _EOF or is_finalizing():
return
logger.warning(f"Closing dangling queue in {r}")
write_loop.add_callback(write_q.put_nowait, _EOF)
Expand Down
7 changes: 4 additions & 3 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
get_traceback,
has_keyword,
import_file,
is_python_shutting_down,
iscoroutinefunction,
offload,
recursive_to_dict,
Expand Down Expand Up @@ -321,6 +320,8 @@ class Server:
"""

_is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing)

default_ip: ClassVar[str] = ""
default_port: ClassVar[int] = 0

Expand Down Expand Up @@ -902,7 +903,7 @@ async def _handle_comm(self, comm: Comm) -> None:
msg = await comm.read()
logger.debug("Message from %r: %s", address, msg)
except OSError as e:
if not is_python_shutting_down():
if not self._is_finalizing():
logger.debug(
"Lost connection to %r while reading message: %s."
" Last operation: %s",
Expand Down Expand Up @@ -1006,7 +1007,7 @@ async def _handle_comm(self, comm: Comm) -> None:

finally:
del self._comms[comm]
if not is_python_shutting_down() and not comm.closed():
if not self._is_finalizing() and not comm.closed():
try:
comm.abort()
except Exception as e:
Expand Down
3 changes: 1 addition & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
TimeoutError,
format_dashboard_link,
get_fileno_limit,
is_python_shutting_down,
key_split_group,
log_errors,
offload,
Expand Down Expand Up @@ -5786,7 +5785,7 @@ async def add_client(
if not comm.closed():
self.client_comms[client].send({"op": "stream-closed"})
try:
if not is_python_shutting_down():
if not self._is_finalizing():
await self.client_comms[client].close()
del self.client_comms[client]
if self.status == Status.running:
Expand Down
13 changes: 0 additions & 13 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1831,19 +1831,6 @@ def recursive_to_dict(
tok.var.reset(tok)


def is_python_shutting_down() -> bool:
"""Is the interpreter shutting down now?
This is a variant of ``sys.is_finalizing`` which can return True inside the ``__del__``
method of classes defined inside the distributed package.
"""
# This import must remain local for the global variable to be
# properly evaluated
from distributed import _python_shutting_down

return _python_shutting_down


class Deadline:
"""Utility class tracking a deadline and the progress toward it"""

Expand Down
7 changes: 4 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
get_ip,
has_arg,
in_async_call,
is_python_shutting_down,
iscoroutinefunction,
json_load_robust,
log_errors,
Expand Down Expand Up @@ -1634,7 +1633,9 @@ def _close(executor, wait):
# weird deadlocks particularly if the task that is executing in
# the thread is waiting for a server reply, e.g. when using
# worker clients, semaphores, etc.
if is_python_shutting_down():

# Are we shutting down the process?
if self._is_finalizing() or not threading.main_thread().is_alive():
# If we're shutting down there is no need to wait for daemon
# threads to finish
_close(executor=executor, wait=False)
Expand All @@ -1643,7 +1644,7 @@ def _close(executor, wait):
await asyncio.to_thread(
_close, executor=executor, wait=executor_wait
)
except RuntimeError: # Are we shutting down the process?
except RuntimeError:
logger.error(
"Could not close executor %r by dispatching to thread. Trying synchronously.",
executor,
Expand Down

0 comments on commit 147c505

Please sign in to comment.