Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible deadlock with P2P DataFrame shuffle #934

Closed
github-actions bot opened this issue Aug 7, 2023 · 4 comments
Closed

Possible deadlock with P2P DataFrame shuffle #934

github-actions bot opened this issue Aug 7, 2023 · 4 comments

Comments

@github-actions
Copy link

github-actions bot commented Aug 7, 2023

Workflow Run URL

Cluster link https://cloud.coiled.io/clusters/253029/information?account=dask-benchmarks&tab=Logs&filterPattern=&sinceMs=1691428782732&untilMs=1691429222732

Traceback
=================================== FAILURES ===================================
_________________________ test_set_index[1-p2p-False] __________________________
[gw2] linux -- Python 3.9.16 /usr/share/miniconda3/envs/test/bin/python

args = {<coroutine object FutureState.wait at 0x7f725c3cc440>}
quiet_exceptions = ()

    async def All(args, quiet_exceptions=()):
        """Wait on many tasks at the same time
    
        Err once any of the tasks err.
    
        See https://github.com/tornadoweb/tornado/issues/1546
    
        Parameters
        ----------
        args: futures to wait for
        quiet_exceptions: tuple, Exception
            Exception types to avoid logging if they fail
        """
        tasks = gen.WaitIterator(*map(asyncio.ensure_future, args))
        results = [None for _ in args]
        while not tasks.done():
            try:
>               result = await tasks.next()
E               asyncio.exceptions.CancelledError

/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:252: CancelledError

During handling of the above exception, another exception occurred:

fut = <Task cancelled name='Task-24998' coro=<All() done, defined at /usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:235>>
timeout = 1200

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
                # In case task cancellation failed with some
                # exception, we should re-raise it
                # See https://bugs.python.org/issue40607
                try:
>                   return fut.result()
E                   asyncio.exceptions.CancelledError

/usr/share/miniconda3/envs/test/lib/python3.9/asyncio/tasks.py:490: CancelledError

The above exception was the direct cause of the following exception:

small_client = <Client: 'tls://10.0.21.83:8786' processes=10 threads=20, memory=71.61 GiB>
persist = False, memory_multiplier = 1, configure_shuffling = None

    @pytest.mark.parametrize("persist", [True, False])
    def test_set_index(small_client, persist, memory_multiplier, configure_shuffling):
        memory = cluster_memory(small_client)  # 76.66 GiB
    
        df_big = timeseries_of_size(
            memory * memory_multiplier, dtypes={str(i): float for i in range(100)}
        )  # 66.58 MiB partitions
        df_big["predicate"] = df_big["0"] * 1e9
        df_big = df_big.astype({"predicate": "int"})
        if persist:
            df_big = df_big.persist()
        df_indexed = df_big.set_index("0")
>       wait(df_indexed.size, small_client, 20 * 60)

tests/benchmarks/test_join.py:73: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/utils_test.py:173: in wait
    distributed.wait(p, timeout=timeout)
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/client.py:5281: in wait
    result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:359: in sync
    return sync(
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:426: in sync
    raise exc.with_traceback(tb)
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:399: in f
    result = yield future
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/tornado/gen.py:769: in run
    value = future.result()
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/client.py:5249: in _wait
    await future
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:1927: in wait_for
    return await asyncio.wait_for(fut, timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fut = <Task cancelled name='Task-24998' coro=<All() done, defined at /usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:235>>
timeout = 1200

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
                # In case task cancellation failed with some
                # exception, we should re-raise it
                # See https://bugs.python.org/issue40607
                try:
                    return fut.result()
                except exceptions.CancelledError as exc:
>                   raise exceptions.TimeoutError() from exc
E                   asyncio.exceptions.TimeoutError

/usr/share/miniconda3/envs/test/lib/python3.9/asyncio/tasks.py:492: TimeoutError
--------------------------- Captured stderr teardown ---------------------------
2023-08-07 17:38:01,999 - coiled - INFO - Cluster 253029 deleted successfully.
---------------------------- Captured log teardown -----------------------------
INFO     coiled-runtime:conftest.py:645 Cluster state dump can be found at: s3://coiled-runtime-ci/test-scratch/cluster_dumps/join-0[469](https://github.com/coiled/benchmarks/actions/runs/5787485240/job/15684507757#step:8:470)d473/benchmarks.test_join.py.test_set_index[1-p2p-False].msgpack.gz
INFO     coiled:core.py:862 Cluster 253029 deleted successfully.
@fjetter fjetter changed the title ⚠️ CI failed ⚠️ Possible deadlock with P2P DataFrame shuffle Aug 8, 2023
@fjetter
Copy link
Member

fjetter commented Aug 8, 2023

cc @hendrikmakait can you have a look if you see something suspicious?

@fjetter
Copy link
Member

fjetter commented Aug 8, 2023

happened again #939

@hendrikmakait
Copy link
Member

#939 does not appear to be the same issue but a connection timeout instead.

@hendrikmakait
Copy link
Member

Should be fixed as of dask/distributed#8088

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants