Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flaky test distributed/tests/test_scheduler.py::test_restart #6455

Closed
graingert opened this issue May 26, 2022 · 2 comments · Fixed by #6504
Closed

flaky test distributed/tests/test_scheduler.py::test_restart #6455

graingert opened this issue May 26, 2022 · 2 comments · Fixed by #6504
Labels
flaky test Intermittent failures on CI.

Comments

@graingert
Copy link
Member

https://github.com/dask/distributed/runs/6606349069?check_suite_focus=true#step:11:1265

================================== FAILURES ===================================
________________________________ test_restart _________________________________

self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
deserializers = None

    async def read(self, deserializers=None):
        stream = self.stream
        if stream is None:
            raise CommClosedError()
    
        fmt = "Q"
        fmt_size = struct.calcsize(fmt)
    
        try:
>           frames_nbytes = await stream.read_bytes(fmt_size)
E           tornado.iostream.StreamClosedError: Stream is closed

distributed\comm\tcp.py:226: StreamClosedError

The above exception was the direct cause of the following exception:

kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>

    async def send_recv_from_rpc(**kwargs):
        if self.serializers is not None and kwargs.get("serializers") is None:
            kwargs["serializers"] = self.serializers
        if self.deserializers is not None and kwargs.get("deserializers") is None:
            kwargs["deserializers"] = self.deserializers
        comm = None
        try:
            comm = await self.live_comm()
            comm.name = "rpc." + key
>           result = await send_recv(comm=comm, op=key, **kwargs)

distributed\core.py:894: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
reply = True, serializers = None, deserializers = None
kwargs = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
msg = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
please_close = True, force_close = True

    async def send_recv(
        comm: Comm,
        *,
        reply: bool = True,
        serializers=None,
        deserializers=None,
        **kwargs,
    ):
        """Send and recv with a Comm.
    
        Keyword arguments turn into the message
    
        response = await send_recv(comm, op='ping', reply=True)
        """
        msg = kwargs
        msg["reply"] = reply
        please_close = kwargs.get("close", False)
        force_close = False
        if deserializers is None:
            deserializers = serializers
        if deserializers is not None:
            msg["serializers"] = deserializers
    
        try:
            await comm.write(msg, serializers=serializers, on_error="raise")
            if reply:
>               response = await comm.read(deserializers=deserializers)

distributed\core.py:739: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
deserializers = None

    async def read(self, deserializers=None):
        stream = self.stream
        if stream is None:
            raise CommClosedError()
    
        fmt = "Q"
        fmt_size = struct.calcsize(fmt)
    
        try:
            frames_nbytes = await stream.read_bytes(fmt_size)
            (frames_nbytes,) = struct.unpack(fmt, frames_nbytes)
    
            frames = host_array(frames_nbytes)
            for i, j in sliding_window(
                2,
                range(0, frames_nbytes + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
            ):
                chunk = frames[i:j]
                chunk_nbytes = len(chunk)
                n = await stream.read_into(chunk)
                assert n == chunk_nbytes, (n, chunk_nbytes)
        except StreamClosedError as e:
            self.stream = None
            self._closed = True
            if not sys.is_finalizing():
>               convert_stream_closed_error(self, e)

distributed\comm\tcp.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

obj = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
exc = StreamClosedError('Stream is closed')

    def convert_stream_closed_error(obj, exc):
        """
        Re-raise StreamClosedError as CommClosedError.
        """
        if exc.real_error is not None:
            # The stream was closed because of an underlying OS error
            exc = exc.real_error
            if ssl and isinstance(exc, ssl.SSLError):
                if "UNKNOWN_CA" in exc.reason:
                    raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
            raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
        else:
>           raise CommClosedError(f"in {obj}: {exc}") from exc
E           distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>: Stream is closed

distributed\comm\tcp.py:150: CommClosedError

The above exception was the direct cause of the following exception:

    async def async_fn():
        result = None
        with tempfile.TemporaryDirectory() as tmpdir:
            config2 = merge({"temporary-directory": tmpdir}, config)
            with dask.config.set(config2):
                workers = []
                s = False
    
                for _ in range(60):
                    try:
                        s, ws = await start_cluster(
                            nthreads,
                            scheduler,
                            security=security,
                            Worker=Worker,
                            scheduler_kwargs=scheduler_kwargs,
                            worker_kwargs=worker_kwargs,
                        )
                    except Exception as e:
                        logger.error(
                            "Failed to start gen_cluster: "
                            f"{e.__class__.__name__}: {e}; retrying",
                            exc_info=True,
                        )
                        await asyncio.sleep(1)
                    else:
                        workers[:] = ws
                        args = [s] + workers
                        break
                if s is False:
                    raise Exception("Could not start cluster")
                if client:
                    c = await Client(
                        s.address,
                        security=security,
                        asynchronous=True,
                        **client_kwargs,
                    )
                    args = [c] + args
    
                try:
                    coro = func(*args, *outer_args, **kwargs)
                    task = asyncio.create_task(coro)
                    coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
>                   result = await coro2

distributed\utils_test.py:1111: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

fut = <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
timeout = 60

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
>               return fut.result()

C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:479: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:52172', workers: 0, cores: 0, tasks: 0>
a = <Nanny: None, threads: 1>, b = <Nanny: None, threads: 2>

    @gen_cluster(client=True, Worker=Nanny, timeout=60)
    async def test_restart(c, s, a, b):
        futures = c.map(inc, range(20))
        await wait(futures)
    
>       await s.restart()

distributed\tests\test_scheduler.py:616: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

args = (<Scheduler 'tcp://127.0.0.1:52172', workers: 0, cores: 0, tasks: 0>,)
kwargs = {}

    async def wrapper(*args, **kwargs):
        with self:
>           return await func(*args, **kwargs)

distributed\utils.py:761: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Scheduler 'tcp://127.0.0.1:52172', workers: 0, cores: 0, tasks: 0>
client = None, timeout = 30

    @log_errors
    async def restart(self, client=None, timeout=30):
        """Restart all workers. Reset local state."""
        stimulus_id = f"restart-{time()}"
        n_workers = len(self.workers)
    
        logger.info("Send lost future signal to clients")
        for cs in self.clients.values():
            self.client_releases_keys(
                keys=[ts.key for ts in cs.wants_what],
                client=cs.client_key,
                stimulus_id=stimulus_id,
            )
    
        nannies = {addr: ws.nanny for addr, ws in self.workers.items()}
    
        for addr in list(self.workers):
            try:
                # Ask the worker to close if it doesn't have a nanny,
                # otherwise the nanny will kill it anyway
                await self.remove_worker(
                    address=addr, close=addr not in nannies, stimulus_id=stimulus_id
                )
            except Exception:
                logger.info(
                    "Exception while restarting.  This is normal", exc_info=True
                )
    
        self.clear_task_state()
    
        for plugin in list(self.plugins.values()):
            try:
                plugin.restart(self)
            except Exception as e:
                logger.exception(e)
    
        logger.debug("Send kill signal to nannies: %s", nannies)
        async with contextlib.AsyncExitStack() as stack:
            nannies = [
                await stack.enter_async_context(
                    rpc(nanny_address, connection_args=self.connection_args)
                )
                for nanny_address in nannies.values()
                if nanny_address is not None
            ]
    
            resps = All(
                [nanny.restart(close=True, timeout=timeout * 0.8) for nanny in nannies]
            )
            try:
>               resps = await asyncio.wait_for(resps, timeout)

distributed\scheduler.py:5098: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

fut = <Task finished name='Task-62380' coro=<All() done, defined at d:\a\distributed\distributed\distributed\utils.py:201> exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
timeout = 30

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
>               return fut.result()

C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:479: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

args = [<coroutine object rpc.__getattr__.<locals>.send_recv_from_rpc at 0x0000015BE54AE640>, <coroutine object rpc.__getattr__.<locals>.send_recv_from_rpc at 0x0000015BE54AED40>]
quiet_exceptions = ()

    async def All(args, quiet_exceptions=()):
        """Wait on many tasks at the same time
    
        Err once any of the tasks err.
    
        See https://github.com/tornadoweb/tornado/issues/1546
    
        Parameters
        ----------
        args: futures to wait for
        quiet_exceptions: tuple, Exception
            Exception types to avoid logging if they fail
        """
        tasks = gen.WaitIterator(*map(asyncio.ensure_future, args))
        results = [None for _ in args]
        while not tasks.done():
            try:
>               result = await tasks.next()

distributed\utils.py:218: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>

    async def send_recv_from_rpc(**kwargs):
        if self.serializers is not None and kwargs.get("serializers") is None:
            kwargs["serializers"] = self.serializers
        if self.deserializers is not None and kwargs.get("deserializers") is None:
            kwargs["deserializers"] = self.deserializers
        comm = None
        try:
            comm = await self.live_comm()
            comm.name = "rpc." + key
            result = await send_recv(comm=comm, op=key, **kwargs)
        except (RPCClosed, CommClosedError) as e:
            if comm:
>               raise type(e)(
                    f"Exception while trying to call remote method {key!r} before comm was established."
                ) from e
E               distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.

distributed\core.py:897: CommClosedError

During handling of the above exception, another exception occurred:

path = 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space'
onerror = <function TemporaryDirectory._rmtree.<locals>.onerror at 0x0000015BE3E78C10>

    def _rmtree_unsafe(path, onerror):
        try:
            with os.scandir(path) as scandir_it:
                entries = list(scandir_it)
        except OSError:
            onerror(os.scandir, path, sys.exc_info())
            entries = []
        for entry in entries:
            fullname = entry.path
            if _rmtree_isdir(entry):
                try:
                    if entry.is_symlink():
                        # This can only happen if someone replaces
                        # a directory with a symlink after the call to
                        # os.scandir or entry.is_dir above.
                        raise OSError("Cannot call rmtree on a symbolic link")
                except OSError:
                    onerror(os.path.islink, fullname, sys.exc_info())
                    continue
                _rmtree_unsafe(fullname, onerror)
            else:
                try:
>                   os.unlink(fullname)
E                   PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'

C:\Miniconda3\envs\dask-distributed\lib\shutil.py:625: PermissionError

During handling of the above exception, another exception occurred:

func = <built-in function unlink>
path = 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
exc_info = (<class 'PermissionError'>, PermissionError(13, 'The process cannot access the file because it is being used by another process'), <traceback object at 0x0000015BE27AFEC0>)

    def onerror(func, path, exc_info):
        if issubclass(exc_info[0], PermissionError):
            def resetperms(path):
                try:
                    _os.chflags(path, 0)
                except AttributeError:
                    pass
                _os.chmod(path, 0o700)
    
            try:
                if path != name:
                    resetperms(_os.path.dirname(path))
                resetperms(path)
    
                try:
>                   _os.unlink(path)
E                   PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'

C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:805: PermissionError

During handling of the above exception, another exception occurred:

args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

C:\Miniconda3\envs\dask-distributed\lib\contextlib.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\utils_test.py:1214: in test_func
    return _run_and_close_tornado(async_fn_outer)
distributed\utils_test.py:472: in _run_and_close_tornado
    return asyncio.run(inner_fn())
C:\Miniconda3\envs\dask-distributed\lib\asyncio\runners.py:44: in run
    return loop.run_until_complete(main)
C:\Miniconda3\envs\dask-distributed\lib\asyncio\base_events.py:647: in run_until_complete
    return future.result()
distributed\utils_test.py:469: in inner_fn
    return await async_fn(*args, **kwargs)
distributed\utils_test.py:1211: in async_fn_outer
    return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:479: in wait_for
    return fut.result()
distributed\utils_test.py:1206: in async_fn
    return result
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:830: in __exit__
    self.cleanup()
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:834: in cleanup
    self._rmtree(self.name)
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:816: in _rmtree
    _shutil.rmtree(name, onerror=onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:757: in rmtree
    return _rmtree_unsafe(path, onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:622: in _rmtree_unsafe
    _rmtree_unsafe(fullname, onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:627: in _rmtree_unsafe
    onerror(os.unlink, fullname, sys.exc_info())
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:808: in onerror
    cls._rmtree(path)
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:816: in _rmtree
    _shutil.rmtree(name, onerror=onerror)
2022-05-26 08:50:34,125 - distributed.utils_perf - WARNING - full garbage collections took 33% CPU time recently (threshold: 10%)
2022-05-26 08:50:35,318 - distributed.utils_perf - WARNING - full garbage collections took 31% CPU time recently (threshold: 10%)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:757: in rmtree
2022-05-26 08:50:36,588 - distributed.utils_perf - WARNING - full garbage collections took 31% CPU time recently (threshold: 10%)
    return _rmtree_unsafe(path, onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:608: in _rmtree_unsafe
    onerror(os.scandir, path, sys.exc_info())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

path = 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
onerror = <function TemporaryDirectory._rmtree.<locals>.onerror at 0x0000015BE576C040>

    def _rmtree_unsafe(path, onerror):
        try:
>           with os.scandir(path) as scandir_it:
E           NotADirectoryError: [WinError 267] The directory name is invalid: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'

C:\Miniconda3\envs\dask-distributed\lib\shutil.py:605: NotADirectoryError
---------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump\test_restart.yaml
---------------------------- Captured stderr call -----------------------------
2022-05-26 08:43:35,072 - distributed.scheduler - INFO - State start
2022-05-26 08:43:35,085 - distributed.scheduler - INFO - Clear task state
2022-05-26 08:43:35,086 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:52172
2022-05-26 08:43:35,086 - distributed.scheduler - INFO -   dashboard at:           127.0.0.1:52171
2022-05-26 08:43:35,116 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:52174'
2022-05-26 08:43:35,117 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:52173'
2022-05-26 08:43:44,624 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52183

2022-05-26 08:43:44,624 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52183

2022-05-26 08:43:44,624 - distributed.worker - INFO -          dashboard at:            127.0.0.1:52184

2022-05-26 08:43:44,624 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52172

2022-05-26 08:43:44,625 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:44,625 - distributed.worker - INFO -               Threads:                          1

2022-05-26 08:43:44,625 - distributed.worker - INFO -                Memory:                   7.00 GiB

2022-05-26 08:43:44,625 - distributed.worker - INFO -       Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-jfalx0k4

2022-05-26 08:43:44,625 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:45,652 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:52183', name: 0, status: init, memory: 0, processing: 0>
2022-05-26 08:43:45,653 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52183
2022-05-26 08:43:45,653 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:45,654 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52172

2022-05-26 08:43:45,655 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:45,655 - distributed.core - INFO - Starting established connection

2022-05-26 08:43:45,695 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52191

2022-05-26 08:43:45,696 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52191

2022-05-26 08:43:45,696 - distributed.worker - INFO -          dashboard at:            127.0.0.1:52192

2022-05-26 08:43:45,696 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52172

2022-05-26 08:43:45,696 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:45,696 - distributed.worker - INFO -               Threads:                          2

2022-05-26 08:43:45,696 - distributed.worker - INFO -                Memory:                   7.00 GiB

2022-05-26 08:43:45,696 - distributed.worker - INFO -       Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-uayyvx5n

2022-05-26 08:43:45,696 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:46,815 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:52191', name: 1, status: init, memory: 0, processing: 0>
2022-05-26 08:43:46,816 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52191
2022-05-26 08:43:46,816 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:46,817 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52172

2022-05-26 08:43:46,817 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:46,818 - distributed.core - INFO - Starting established connection

2022-05-26 08:43:46,850 - distributed.scheduler - INFO - Receive client connection: Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:46,852 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:48,166 - distributed.scheduler - INFO - Send lost future signal to clients
2022-05-26 08:43:48,167 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:52183', name: 0, status: running, memory: 0, processing: 0>
2022-05-26 08:43:48,168 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:52183
2022-05-26 08:43:48,168 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:52191', name: 1, status: running, memory: 0, processing: 0>
2022-05-26 08:43:48,168 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:52191
2022-05-26 08:43:48,168 - distributed.scheduler - INFO - Lost all workers
2022-05-26 08:43:48,169 - distributed.scheduler - INFO - Clear task state
2022-05-26 08:43:48,182 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:52191'.
2022-05-26 08:43:48,183 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:52191'. Shutting down.

2022-05-26 08:43:48,183 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52191

2022-05-26 08:43:48,189 - distributed.nanny - INFO - Nanny asking worker to close
2022-05-26 08:43:48,197 - distributed.nanny - INFO - Nanny asking worker to close
2022-05-26 08:43:48,200 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52183

2022-05-26 08:43:48,204 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing

2022-05-26 08:43:48,211 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing

2022-05-26 08:43:48,215 - distributed.nanny - INFO - Worker closed

Traceback (most recent call last):

  File "C:\Miniconda3\envs\dask-distributed\lib\multiprocessing\queues.py", line 247, in _feed

    send_bytes(obj)

  File "C:\Miniconda3\envs\dask-distributed\lib\multiprocessing\connection.py", line 205, in send_bytes

    self._send_bytes(m[offset:offset + size])

  File "C:\Miniconda3\envs\dask-distributed\lib\multiprocessing\connection.py", line 285, in _send_bytes

    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)

BrokenPipeError: [WinError 232] The pipe is being closed

2022-05-26 08:43:49,178 - distributed.nanny - WARNING - Restarting worker
2022-05-26 08:43:49,183 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:52174'.
2022-05-26 08:43:52,524 - distributed.scheduler - INFO - Remove client Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:52,525 - distributed.scheduler - INFO - Remove client Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:52,525 - distributed.scheduler - INFO - Close client connection: Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:52,527 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:52173'.
2022-05-26 08:43:52,528 - distributed.nanny - INFO - Nanny asking worker to close
2022-05-26 08:43:53,775 - tornado.application - ERROR - Exception in callback functools.partial(<built-in method set_result of _asyncio.Future object at 0x0000015BE55FF040>, None)
Traceback (most recent call last):
  File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
asyncio.exceptions.InvalidStateError: invalid state
2022-05-26 08:43:55,702 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52212

2022-05-26 08:43:55,703 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52212

2022-05-26 08:43:55,703 - distributed.worker - INFO -          dashboard at:            127.0.0.1:52213

2022-05-26 08:43:55,703 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52172

2022-05-26 08:43:55,704 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:55,704 - distributed.worker - INFO -               Threads:                          1

2022-05-26 08:43:55,704 - distributed.worker - INFO -                Memory:                   7.00 GiB

2022-05-26 08:43:55,704 - distributed.worker - INFO -       Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-55fyl_sg

2022-05-26 08:43:55,704 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:55,707 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52212

2022-05-26 08:43:55,707 - distributed.worker - INFO - Closed worker has not yet started: Status.init

2022-05-26 08:43:55,926 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52219

2022-05-26 08:43:55,927 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52219

2022-05-26 08:43:55,927 - distributed.worker - INFO -          dashboard at:            127.0.0.1:52220

2022-05-26 08:43:55,927 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52172

2022-05-26 08:43:55,927 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:55,927 - distributed.worker - INFO -               Threads:                          2

2022-05-26 08:43:55,927 - distributed.worker - INFO -                Memory:                   7.00 GiB

2022-05-26 08:43:55,927 - distributed.worker - INFO -       Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-c_b9ylk8

2022-05-26 08:43:55,928 - distributed.worker - INFO - -------------------------------------------------

2022-05-26 08:43:55,933 - distributed.scheduler - INFO - Scheduler closing...
2022-05-26 08:43:55,934 - distributed.scheduler - INFO - Scheduler closing all comms
------------------------------ Captured log call ------------------------------
ERROR    asyncio:base_events.py:1753 Future exception was never retrieved
future: <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\core.py", line 894, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "d:\a\distributed\distributed\distributed\core.py", line 739, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:52203 remote=tcp://127.0.0.1:52173>: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\gen.py", line 769, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "d:\a\distributed\distributed\distributed\utils.py", line 231, in quiet
    yield task
  File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "d:\a\distributed\distributed\distributed\core.py", line 897, in send_recv_from_rpc
    raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
ERROR    asyncio.events:utils.py:787 
Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\core.py", line 894, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "d:\a\distributed\distributed\distributed\core.py", line 739, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\utils_test.py", line 1111, in async_fn
    result = await coro2
  File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 479, in wait_for
    return fut.result()
  File "D:\a\distributed\distributed\distributed\tests\test_scheduler.py", line 616, in test_restart
    await s.restart()
  File "d:\a\distributed\distributed\distributed\utils.py", line 761, in wrapper
    return await func(*args, **kwargs)
  File "d:\a\distributed\distributed\distributed\scheduler.py", line 5098, in restart
    resps = await asyncio.wait_for(resps, timeout)
  File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 479, in wait_for
    return fut.result()
  File "d:\a\distributed\distributed\distributed\utils.py", line 218, in All
    result = await tasks.next()
  File "d:\a\distributed\distributed\distributed\core.py", line 897, in send_recv_from_rpc
    raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 625, in _rmtree_unsafe
    os.unlink(fullname)
PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 805, in onerror
    _os.unlink(path)
PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\base_events.py", line 647, in run_until_complete
    return future.result()
  File "d:\a\distributed\distributed\distributed\utils_test.py", line 469, in inner_fn
    return await async_fn(*args, **kwargs)
  File "d:\a\distributed\distributed\distributed\utils_test.py", line 1211, in async_fn_outer
    return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
  File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 479, in wait_for
    return fut.result()
  File "d:\a\distributed\distributed\distributed\utils_test.py", line 1206, in async_fn
    return result
  File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 830, in __exit__
    self.cleanup()
  File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 834, in cleanup
    self._rmtree(self.name)
  File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 816, in _rmtree
    _shutil.rmtree(name, onerror=onerror)
  File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 757, in rmtree
    return _rmtree_unsafe(path, onerror)
  File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 622, in _rmtree_unsafe
    _rmtree_unsafe(fullname, onerror)
  File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 627, in _rmtree_unsafe
    onerror(os.unlink, fullname, sys.exc_info())
  File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 808, in onerror
    cls._rmtree(path)
  File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 816, in _rmtree
    _shutil.rmtree(name, onerror=onerror)
  File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 757, in rmtree
    return _rmtree_unsafe(path, onerror)
  File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 608, in _rmtree_unsafe
    onerror(os.scandir, path, sys.exc_info())
  File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 605, in _rmtree_unsafe
    with os.scandir(path) as scandir_it:
NotADirectoryError: [WinError 267] The directory name is invalid: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\nanny.py", line 632, in start
    await self.running.wait()
  File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\locks.py", line 226, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "d:\a\distributed\distributed\distributed\utils.py", line 761, in wrapper
    return await func(*args, **kwargs)
  File "d:\a\distributed\distributed\distributed\nanny.py", line 520, in _on_exit
    await self.instantiate()
  File "d:\a\distributed\distributed\distributed\nanny.py", line 410, in instantiate
    result = await asyncio.wait_for(
  File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 469, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError
----- generated xml file: D:\a\distributed\distributed\reports\pytest.xml -----
@hendrikmakait
Copy link
Member

From the captured logs, this might be related to #6494.

@hendrikmakait hendrikmakait added the flaky test Intermittent failures on CI. label Jun 3, 2022
@gjoseph92
Copy link
Collaborator

This is related to #6494, but subtly a little different.

The problem here is that this Scheduler.restart logic isn't what we should do, plus it has a bug in it:

nannies = {addr: ws.nanny for addr, ws in self.workers.items()}
for addr in list(self.workers):
try:
# Ask the worker to close if it doesn't have a nanny,
# otherwise the nanny will kill it anyway
await self.remove_worker(
address=addr, close=addr not in nannies, stimulus_id=stimulus_id
)

  1. addr not in nannies will always be False. Because nannies contains (despite the name) every worker in self.workers, see L5094.
  2. This removes the Nanny-based workers from self.workers immediately, even though we are going to RPC with them a few lines below. When client.restart() may cause workers to shut down instead of restarting #6494 happens, the worker heartbeats, is told it's missing, then it shuts down, in the middle of our RPC with the Nanny asking it to restart. That causes this error. This is yet another incarnation of Eliminate partially-removed-worker state on scheduler (comms open, state removed) #6390. We just shouldn't be removing entires from self.workers until all connections with them are actually closed.

This will require a small fix on the scheduler side to restart, but in doing so, I think it'll also fix #6494.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants