Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map_overlap tasks fail to deserialize on workers - keywords must be strings #6624

Closed
gjoseph92 opened this issue Jun 24, 2022 · 3 comments · Fixed by #6626
Closed

map_overlap tasks fail to deserialize on workers - keywords must be strings #6624

gjoseph92 opened this issue Jun 24, 2022 · 3 comments · Fixed by #6626
Assignees
Labels
bug Something is broken regression

Comments

@gjoseph92
Copy link
Collaborator

import distributed
import numpy as np
import dask.array as da


if __name__ == "__main__":
    v = da.random.random((20, 20), chunks=(5, 5))

    overlapped = da.map_overlap(np.sum, v, depth=2, boundary="reflect")

    client = distributed.Client()
    overlapped.compute()
2022-06-23 19:16:55,060 - distributed.core - ERROR - keywords must be strings
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/core.py", line 849, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/gabe/dev/distributed/distributed/worker.py", line 1818, in _
    event = cls(**kwargs)
  File "<string>", line 13, in __init__
  File "/Users/gabe/dev/distributed/distributed/worker_state_machine.py", line 667, in __post_init__
    self.run_spec = SerializedTask(**self.run_spec)  # type: ignore[unreachable]
TypeError: keywords must be strings

...

2022-06-23 19:16:55,068 - distributed.nanny - ERROR - Worker process died unexpectedly
    self._target(*self._args, **self._kwargs)
  File "/Users/gabe/dev/distributed/distributed/nanny.py", line 846, in watch_stop_q
    child_stop_q.close()
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/multiprocessing/queues.py", line 143, in close
    self._reader.close()
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/multiprocessing/connection.py", line 182, in close
    self._close()
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/multiprocessing/connection.py", line 366, in _close
    _close(self._handle)

...

distributed.scheduler.KilledWorker: ("('random_sample-concatenate-1086077ac09ace0ac4330fce33825511', 2, 3)", <WorkerState 'tcp://127.0.0.1:65387', name: 2, status: closed, memory: 0, processing: 32>)

cc @crusaderky @fjetter

Environment:

  • Dask version: dc019ed
  • Python version: 3.9.5
  • Operating System: macOS
  • Install method (conda, pip, source): source
@gjoseph92 gjoseph92 added bug Something is broken regression labels Jun 24, 2022
@fjetter fjetter self-assigned this Jun 24, 2022
@fjetter
Copy link
Member

fjetter commented Jun 24, 2022

The runspec at this point has the shape

{0: {'shape': (...), 'num-chunks': (...), 'array-location': [...], 'chunk-location': (...)}, None: {'shape': (...), 'num-chunks': (...), 'array-location': [...], 'chunk-location': (...), 'chunk-shape': (...), 'dtype': dtype('float64')}}

@fjetter
Copy link
Member

fjetter commented Jun 24, 2022

I'm not sure if this is a worker state regression. On scheduler side, the run_spec is a Serialized object in _task_to_msg
but the comm handler on worker side already has the above scrambled run_spec.
So either something is wrong in our serialization or with how we call handlers

@fjetter
Copy link
Member

fjetter commented Jun 24, 2022

Git bisect shows that bd98e66 / #6410 is apparently the culprit

However, before this commit, I don't get a succesful run but rather the following exception instead

    def test_runspec_regression_sync():
        da = pytest.importorskip("dask.array")
        with Client():
            # https://github.com/dask/distributed/issues/6624
            v = da.random.random((20, 20), chunks=(5, 5))

            overlapped = da.map_overlap(np.sum, v, depth=2, boundary="reflect")
>           overlapped.compute()

distributed/tests/test_client.py:7508:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../dask/dask/base.py:312: in compute
    (result,) = compute(self, traverse=False, **kwargs)
../dask/dask/base.py:601: in compute
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
../dask/dask/base.py:601: in <listcomp>
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
../dask/dask/array/core.py:1269: in finalize
    return concatenate3(results)
../dask/dask/array/core.py:5165: in concatenate3
    chunks = chunks_from_arrays(arrays)
../dask/dask/array/core.py:4952: in chunks_from_arrays
    result.append(tuple(shape(deepfirst(a))[dim] for a in arrays))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

.0 = <list_iterator object at 0x1472ede10>

>   result.append(tuple(shape(deepfirst(a))[dim] for a in arrays))
E   IndexError: tuple index out of range

@fjetter fjetter linked a pull request Jun 24, 2022 that will close this issue
jsignell pushed a commit that referenced this issue Jun 24, 2022
Hotfix for #6624 by reverting the compute-task message format almost to the original state before #6410
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants