Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flaky test diagnostics/tests/test_progress.py::test_group_timing #6452

Closed
graingert opened this issue May 26, 2022 · 3 comments · Fixed by #6504
Closed

flaky test diagnostics/tests/test_progress.py::test_group_timing #6452

graingert opened this issue May 26, 2022 · 3 comments · Fixed by #6504
Assignees
Labels
flaky test Intermittent failures on CI.

Comments

@graingert
Copy link
Member

https://github.com/dask/distributed/runs/6606348457?check_suite_focus=true#step:11:1696

=================================== FAILURES ===================================
______________________________ test_group_timing _______________________________

self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
deserializers = None

    async def read(self, deserializers=None):
        stream = self.stream
        if stream is None:
            raise CommClosedError()
    
        fmt = "Q"
        fmt_size = struct.calcsize(fmt)
    
        try:
>           frames_nbytes = await stream.read_bytes(fmt_size)
E           tornado.iostream.StreamClosedError: Stream is closed

distributed/comm/tcp.py:226: StreamClosedError

The above exception was the direct cause of the following exception:

kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>

    async def send_recv_from_rpc(**kwargs):
        if self.serializers is not None and kwargs.get("serializers") is None:
            kwargs["serializers"] = self.serializers
        if self.deserializers is not None and kwargs.get("deserializers") is None:
            kwargs["deserializers"] = self.deserializers
        comm = None
        try:
            comm = await self.live_comm()
            comm.name = "rpc." + key
>           result = await send_recv(comm=comm, op=key, **kwargs)

distributed/core.py:894: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
reply = True, serializers = None, deserializers = None
kwargs = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
msg = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
please_close = True, force_close = True

    async def send_recv(
        comm: Comm,
        *,
        reply: bool = True,
        serializers=None,
        deserializers=None,
        **kwargs,
    ):
        """Send and recv with a Comm.
    
        Keyword arguments turn into the message
    
        response = await send_recv(comm, op='ping', reply=True)
        """
        msg = kwargs
        msg["reply"] = reply
        please_close = kwargs.get("close", False)
        force_close = False
        if deserializers is None:
            deserializers = serializers
        if deserializers is not None:
            msg["serializers"] = deserializers
    
        try:
            await comm.write(msg, serializers=serializers, on_error="raise")
            if reply:
>               response = await comm.read(deserializers=deserializers)

distributed/core.py:739: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
deserializers = None

    async def read(self, deserializers=None):
        stream = self.stream
        if stream is None:
            raise CommClosedError()
    
        fmt = "Q"
        fmt_size = struct.calcsize(fmt)
    
        try:
            frames_nbytes = await stream.read_bytes(fmt_size)
            (frames_nbytes,) = struct.unpack(fmt, frames_nbytes)
    
            frames = host_array(frames_nbytes)
            for i, j in sliding_window(
                2,
                range(0, frames_nbytes + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
            ):
                chunk = frames[i:j]
                chunk_nbytes = len(chunk)
                n = await stream.read_into(chunk)
                assert n == chunk_nbytes, (n, chunk_nbytes)
        except StreamClosedError as e:
            self.stream = None
            self._closed = True
            if not sys.is_finalizing():
>               convert_stream_closed_error(self, e)

distributed/comm/tcp.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

obj = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
exc = StreamClosedError('Stream is closed')

    def convert_stream_closed_error(obj, exc):
        """
        Re-raise StreamClosedError as CommClosedError.
        """
        if exc.real_error is not None:
            # The stream was closed because of an underlying OS error
            exc = exc.real_error
            if ssl and isinstance(exc, ssl.SSLError):
                if "UNKNOWN_CA" in exc.reason:
                    raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
            raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
        else:
>           raise CommClosedError(f"in {obj}: {exc}") from exc
E           distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>: Stream is closed

distributed/comm/tcp.py:150: CommClosedError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39933', workers: 0, cores: 0, tasks: 0>
a = <Nanny: None, threads: 1>, b = <Nanny: None, threads: 2>

    @gen_cluster(client=True, Worker=Nanny)
    async def test_group_timing(c, s, a, b):
        p = GroupTiming(s)
        s.add_plugin(p)
    
        assert len(p.time) == 2
        assert len(p.nthreads) == 2
    
        futures1 = c.map(slowinc, range(10), delay=0.3)
        futures2 = c.map(slowdec, range(10), delay=0.3)
        await wait(futures1 + futures2)
    
        assert len(p.time) > 2
        assert len(p.nthreads) == len(p.time)
        assert all([nt == s.total_nthreads for nt in p.nthreads])
        assert "slowinc" in p.compute
        assert "slowdec" in p.compute
        assert all([len(v) == len(p.time) for v in p.compute.values()])
        assert s.task_groups.keys() == p.compute.keys()
        assert all(
            [
                abs(s.task_groups[k].all_durations["compute"] - sum(v)) < 1.0e-12
                for k, v in p.compute.items()
            ]
        )
    
>       await s.restart()

distributed/diagnostics/tests/test_progress.py:210: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/utils.py:761: in wrapper
    return await func(*args, **kwargs)
distributed/scheduler.py:5098: in restart
    resps = await asyncio.wait_for(resps, timeout)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/utils.py:218: in All
    result = await tasks.next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>

    async def send_recv_from_rpc(**kwargs):
        if self.serializers is not None and kwargs.get("serializers") is None:
            kwargs["serializers"] = self.serializers
        if self.deserializers is not None and kwargs.get("deserializers") is None:
            kwargs["deserializers"] = self.deserializers
        comm = None
        try:
            comm = await self.live_comm()
            comm.name = "rpc." + key
            result = await send_recv(comm=comm, op=key, **kwargs)
        except (RPCClosed, CommClosedError) as e:
            if comm:
>               raise type(e)(
                    f"Exception while trying to call remote method {key!r} before comm was established."
                ) from e
E               distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.

distributed/core.py:897: CommClosedError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_group_timing.yaml
----------------------------- Captured stderr call -----------------------------
2022-05-26 08:26:59,172 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:44773
2022-05-26 08:26:59,172 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:44773
2022-05-26 08:26:59,172 - distributed.worker - INFO -          dashboard at:            127.0.0.1:37305
2022-05-26 08:26:59,173 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,173 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,173 - distributed.worker - INFO -               Threads:                          1
2022-05-26 08:26:59,173 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:26:59,173 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-vr4ic7w5
2022-05-26 08:26:59,173 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,187 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33261
2022-05-26 08:26:59,187 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33261
2022-05-26 08:26:59,188 - distributed.worker - INFO -          dashboard at:            127.0.0.1:37299
2022-05-26 08:26:59,188 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,188 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,188 - distributed.worker - INFO -               Threads:                          2
2022-05-26 08:26:59,188 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:26:59,188 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-xwekb6zo
2022-05-26 08:26:59,188 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,629 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,629 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,630 - distributed.core - INFO - Starting established connection
2022-05-26 08:26:59,656 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,656 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,656 - distributed.core - INFO - Starting established connection
2022-05-26 08:27:02,135 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:44773'.
2022-05-26 08:27:02,136 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:44773'. Shutting down.
2022-05-26 08:27:02,136 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:44773
2022-05-26 08:27:02,144 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing
2022-05-26 08:27:02,146 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33261
2022-05-26 08:27:02,146 - distributed.nanny - INFO - Worker closed
2022-05-26 08:27:02,147 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-05-26 08:27:02,147 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing
Traceback (most recent call last):
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/queues.py", line 245, in _feed
    send_bytes(obj)
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
2022-05-26 08:27:02,335 - distributed.nanny - WARNING - Restarting worker
2022-05-26 08:27:03,766 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:37491
2022-05-26 08:27:03,767 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:37491
2022-05-26 08:27:03,767 - distributed.worker - INFO -          dashboard at:            127.0.0.1:39653
2022-05-26 08:27:03,767 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:27:03,767 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,767 - distributed.worker - INFO -               Threads:                          2
2022-05-26 08:27:03,767 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:27:03,767 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-ozdrdinu
2022-05-26 08:27:03,767 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,769 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:37491
2022-05-26 08:27:03,769 - distributed.worker - INFO - Closed worker has not yet started: Status.init
2022-05-26 08:27:03,825 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:32867
2022-05-26 08:27:03,825 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:32867
2022-05-26 08:27:03,825 - distributed.worker - INFO -          dashboard at:            127.0.0.1:42421
2022-05-26 08:27:03,825 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:27:03,825 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,825 - distributed.worker - INFO -               Threads:                          1
2022-05-26 08:27:03,825 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:27:03,825 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-uo3d6iix
2022-05-26 08:27:03,826 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:04,277 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:39933
2022-05-26 08:27:04,277 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:04,278 - distributed.core - INFO - Starting established connection
2022-05-26 08:27:04,943 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:32867
2022-05-26 08:27:04,944 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing
------------------------------ Captured log call -------------------------------
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11894.830519' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.12771.494043' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13829.553757' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13184.014458' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.10821.172500' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11321.529407' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.10872.792584' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11320.329941' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.15696.336646' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13303.165900' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13126.769215' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
ERROR    asyncio:base_events.py:1707 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
ERROR    asyncio:base_events.py:1707 Future exception was never retrieved
future: <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 894, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 739, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:42246 remote=tcp://127.0.0.1:40241>: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py", line 769, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/runner/work/distributed/distributed/distributed/utils.py", line 231, in quiet
    yield task
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 897, in send_recv_from_rpc
    raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
ERROR    asyncio:base_events.py:1707 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
- generated xml file: /home/runner/work/distributed/distributed/reports/pytest.xml -
@graingert graingert added the flaky test Intermittent failures on CI. label May 26, 2022
@graingert graingert changed the title flakey test diagnostics/tests/test_progress.py::test_group_timing flaky test diagnostics/tests/test_progress.py::test_group_timing May 26, 2022
@hendrikmakait hendrikmakait self-assigned this Jun 2, 2022
@hendrikmakait
Copy link
Member

hendrikmakait commented Jun 2, 2022

Condensed test case that usually triggers within ~20 iterations:

@gen_cluster(client=True, Worker=Nanny)
async def test_trigger_comm_closed_error(c, s, a, b):
    futures1 = c.map(slowinc, range(10), delay=0.3)
    futures2 = c.map(slowdec, range(10), delay=0.3)
    await wait(futures1 + futures2)

    await s.restart()

@hendrikmakait
Copy link
Member

This test does not flake if I comment out the following line:

await self.close()

@hendrikmakait
Copy link
Member

#6494 addresses the underlying issue of this flake.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants