Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify resources for dask builtin functions #2127

Open
kkraus14 opened this issue Jul 18, 2018 · 14 comments
Open

Specify resources for dask builtin functions #2127

kkraus14 opened this issue Jul 18, 2018 · 14 comments

Comments

@kkraus14
Copy link
Member

I'm trying to specify resources for builtin dask functions such a dd.read_csv, with an end goal of running certain functions on "CPU workers" and other functions on "GPU workers". Here's a minimal example of trying to force dd.read_csv to run only on my "CPU worker":

cluster = LocalCluster(processes=False)
cpu_worker = cluster.workers[0]
cpu_worker.name = 'cpu'
cpu_worker.set_resources(CPU=80)
client = Client(cluster)
pdf = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
test_df = dd.from_pandas(pdf, npartitions=2)
test_df.compute(resources = {tuple(test_df.__dask_keys__()): {'CPU': 1}})

This returns the following:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'
distributed.scheduler - ERROR - unhashable type: 'list'
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/scheduler.py", line 1929, in handle_client
    msgs = yield comm.read()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/tcp.py", line 203, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'
distributed.utils - ERROR - unhashable type: 'list'
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/utils.py", line 622, in log_errors
    yield
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/client.py", line 921, in _handle_report
    six.reraise(*clean_exception(**msg))
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/tcp.py", line 203, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'

It would be great if you could specify resources as you create tasks as opposed to when computing them, similar to how you can with client.submit I.E.

test_df = dd.from_pandas(pdf, npartitions=2, resources={'CPU': 1})
@mrocklin
Copy link
Member

Hrm, so this works fine for me both on master and latest release

In [1]: from dask.distributed import Client, LocalCluster
   ...: import pandas as pd
   ...: import dask.dataframe as dd
   ...: cluster = LocalCluster(processes=False)
   ...: cpu_worker = cluster.workers[0]
   ...: cpu_worker.name = 'cpu'
   ...: cpu_worker.set_resources(CPU=80)
   ...: client = Client(cluster)
   ...: pdf = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
   ...: test_df = dd.from_pandas(pdf, npartitions=2)
   ...: test_df.compute(resources = {tuple(test_df.__dask_keys__()): {'CPU': 1}})
   ...: 
Out[1]: 
   a  b
0  1  4
1  2  5
2  3  6

I might also suggest the following test which sets up resources and names when creating the workers and verifies that tasks are allocated appropriately by checking the structured log.

from dask.distributed import Client, LocalCluster
import pandas as pd
import dask.dataframe as dd

cluster = LocalCluster(n_workers=0, processes=False)
client = Client(cluster)
alice = cluster.start_worker(resources={'CPU': 80}, name='alice')
bob = cluster.start_worker(name='bob')

pdf = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
ddf = dd.from_pandas(pdf, npartitions=2)
ddf.compute(resources = {tuple(ddf.__dask_keys__()): {'CPU': 1}})

assert alice.log
assert not bob.log

@mrocklin
Copy link
Member

The exception is odd. If you were using something other than LocalCluster I would guess that you had a version mismatch between your workers or between you workers and client, but given that everything is local I don't see how this could be. How did you install Dask? I don't suppose you can provide a conda environment.yml or something similar that reproduces the problem? (my guess would be that this is challenging, but thought I'd ask anyway)

@kkraus14
Copy link
Member Author

I was on Dask 0.17.2 and just confirmed the exception issue is resolved when I upgraded to Dask 0.18.1. Thanks!

I'm planning on chaining together a number of functions, is there any way to specify the resources when calling the functions as opposed to when calling .compute?

@mrocklin
Copy link
Member

mrocklin commented Jul 18, 2018 via email

@kkraus14
Copy link
Member Author

So for dd.read_csv if I call __dask_keys__() it only returns the from-delayed tasks while it looks like there's also pandas_read_text and read-block tasks which end up getting scheduled on the GPU workers. Is there a different function or a snippet which given an object returns every key that we need to define the resources for?
I.E.

test = dd.read_csv("/path/to/some/file")
resources = {tuple(test.getallkeys()): {'CPU': 1}}
test.compute()

@mrocklin
Copy link
Member

mrocklin commented Jul 18, 2018 via email

@kkraus14
Copy link
Member Author

Hmm, I'd expect the following to work but it's still scheduling tasks on the GPU workers including the from-delayed tasks as well:

test = dd.read_csv("/path/to/some/file")
resources = {tuple(test.dask): {'CPU': 1}}
test.compute()

@mrocklin
Copy link
Member

mrocklin commented Jul 18, 2018 via email

@kkraus14
Copy link
Member Author

Still the same behavior. (Note my above example forgot to specify resources in the compute call but I am in fact setting it while testing)

@mrocklin
Copy link
Member

I'll take a look sometime today.

@mrocklin
Copy link
Member

OK, it looks like this is failing to support tuple-based keys in the .get path. Should be an easy fix.

Short term you could do this as a workaround:

result = client.compute(df).result()

My apologies for the dust here. Most users of resources historically have been doing more custom computations (delayed, futures) and have been using the client API. The code paths around using them with the standard collections (array, dataframe) have not been as well travelled. I'll push a fix for this in a bit.

@mrocklin
Copy link
Member

If you use optimize_graph=False then #2131 should solve your immediate issue. There is still a bit of work to clear up this situation generally though and make it more usable.

@kkraus14
Copy link
Member Author

@mrocklin Unfortunately I have some pretty hard time constraints for what I'm working on where creating 8 dask workers with a single GPU visible is working well enough for my needs currently, but I'll hopefully have time to revisit this late next week to continue troubleshooting with you towards a solution. Apologies for the delay!

@mrocklin
Copy link
Member

It's just fine. This has been a useful exercise to flush out some bugs, but technical bugs and usability bugs, with using resources with collections.

Good luck!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants