Skip to content

[dask-on-ray] ValueError on read-only memory #10124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 tasks done
stephanie-wang opened this issue Aug 15, 2020 · 16 comments
Open
2 tasks done

[dask-on-ray] ValueError on read-only memory #10124

stephanie-wang opened this issue Aug 15, 2020 · 16 comments
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical

Comments

@stephanie-wang
Copy link
Contributor

What is the problem?

Ray version and other system information (Python version, TensorFlow version, OS): 0.9dev

I was trying out the new Ray backend plugin for Dask and ran into a read-only error. Looks like the Dask code tries to write in-place, but Ray doesn't allow it since objects are immutable.

Reproduction (REQUIRED)

Please provide a script that can be run to reproduce the issue. The script should have no external library dependencies (i.e., use fake or mock data / environments):

import ray
import dask
import dask.dataframe as dd
import pandas as pd
import numpy as np
from ray.experimental.dask import ray_dask_get

import time

N_ROWS = 1000
N_PARTITIONS = 100

ray.init()

df = dd.from_pandas(pd.DataFrame(np.random.randn(N_ROWS, 2), columns=['a', 'b']), npartitions=N_PARTITIONS)
start = time.time()
print(df.groupby('b').a.sum().compute(scheduler=ray_dask_get))
end = time.time()
print("ray", end - start)

If we cannot run your script, we cannot fix your issue.

  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.
@stephanie-wang stephanie-wang added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Aug 15, 2020
@stephanie-wang
Copy link
Contributor Author

cc @clarkzinzow, do you have any ideas? Also cc @ericl.

@stephanie-wang stephanie-wang added P3 Issue moderate in impact or severity P2 Important issue, but not time-critical and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) P3 Issue moderate in impact or severity labels Aug 15, 2020
@clarkzinzow
Copy link
Contributor

clarkzinzow commented Aug 15, 2020

@stephanie-wang Just ran the reproduction script and it worked for me off commit 7ffb37f711f0045f1d1199db139e31e58fcbbe00.

In [1]: import ray
   ...: import dask
   ...: import dask.dataframe as dd
   ...: import pandas as pd
   ...: import numpy as np
   ...: from ray.experimental.dask import ray_dask_get
   ...:
   ...: import time
   ...:
   ...: N_ROWS = 1000
   ...: N_PARTITIONS = 100
   ...:
   ...: ray.init()
   ...:
   ...: df = dd.from_pandas(pd.DataFrame(np.random.randn(N_ROWS, 2), columns=['a', 'b']), npartitions=N_PARTITIONS)
   ...: start = time.time()
   ...: print(df.groupby('b').a.sum().compute(scheduler=ray_dask_get))
   ...: end = time.time()
   ...: print("ray", end - start)
   ...:
2020-08-15 00:26:50,891 INFO resource_spec.py:250 -- Starting Ray with 14.7 GiB memory available for workers and up to 7.36 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-08-15 00:26:51,499 INFO services.py:1215 -- View the Ray dashboard at localhost:8265
b
-1.135363   -0.535185
-1.075399   -0.164216
-0.661416    0.511669
-0.222915   -0.929146
 0.416142   -0.553446
               ...
 0.597348    0.610069
 0.619706   -1.589376
 0.723611   -2.171919
 1.380745    0.196464
 1.572565    0.296262
Name: a, Length: 1000, dtype: float64
ray 4.355887413024902

Dask collections generally don't allow in-place mutation, that shouldn't even be possible to express in a Dask graph, so I'm wondering what you're running into here... 🤔

@ericl
Copy link
Contributor

ericl commented Aug 15, 2020

Traceback (most recent call last):
  File "d.py", line 18, in <module>
    print(df.groupby('b').a.sum().compute(scheduler=ray_dask_get))
  File "/usr/local/lib/python3.6/dist-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/dask/base.py", line 447, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/eric/Desktop/ray/python/ray/experimental/dask/scheduler.py", line 92, in ray_dask_get
    result = ray_get_unpack(object_refs)
  File "/home/eric/Desktop/ray/python/ray/experimental/dask/scheduler.py", line 257, in ray_get_unpack
    computed_result = ray.get(object_refs)
  File "/home/eric/Desktop/ray/python/ray/worker.py", line 1552, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::ray.experimental.dask.scheduler.dask_task_wrapper() (pid=22464, ip=192.168.5.121)
  File "python/ray/_raylet.pyx", line 437, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 459, in ray._raylet.execute_task
ray.exceptions.RayTaskError: ray::ray.experimental.dask.scheduler.dask_task_wrapper() (pid=22473, ip=192.168.5.121)
  File "python/ray/_raylet.pyx", line 437, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 459, in ray._raylet.execute_task
ray.exceptions.RayTaskError: ray::ray.experimental.dask.scheduler.dask_task_wrapper() (pid=22469, ip=192.168.5.121)
  File "python/ray/_raylet.pyx", line 437, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 459, in ray._raylet.execute_task
ray.exceptions.RayTaskError: ray::ray.experimental.dask.scheduler.dask_task_wrapper() (pid=22473, ip=192.168.5.121)
  File "python/ray/_raylet.pyx", line 474, in ray._raylet.execute_task
  File "/home/eric/Desktop/ray/python/ray/experimental/dask/scheduler.py", line 233, in dask_task_wrapper
    return func(*actual_args)
  File "/usr/local/lib/python3.6/dist-packages/dask/utils.py", line 31, in apply
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/dask/dataframe/groupby.py", line 295, in _apply_chunk
    return func(g[columns], **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/dask/utils.py", line 895, in __call__
    return getattr(obj, self.method)(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/groupby/groupby.py", line 1372, in f
    return self._cython_agg_general(alias, alt=npfunc, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/groupby/groupby.py", line 891, in _cython_agg_general
    obj._values, how, min_count=min_count
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/groupby/ops.py", line 587, in aggregate
    "aggregate", values, how, axis, min_count=min_count
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/groupby/ops.py", line 530, in _cython_operation
    result, counts, values, codes, func, is_datetimelike, min_count
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/groupby/ops.py", line 608, in _aggregate
    agg_func(result, counts, values, comp_ids, min_count)
  File "pandas/_libs/groupby.pyx", line 453, in pandas._libs.groupby._group_add
  File "stringsource", line 658, in View.MemoryView.memoryview_cwrapper
  File "stringsource", line 349, in View.MemoryView.memoryview.__cinit__
ValueError: buffer source array is read-only

I can reproduce, I believe it's due to some argument being passed to the task wrapper that is presented as a readonly Ray object. It's probably specific to particular dask/pandas/numpy versions.

@clarkzinzow
Copy link
Contributor

Do you mind giving your versions of Numpy, Dask and Pandas? I'm on Python 3.7.5 and:

clark@clark-dev:(ws)/ray $ pip freeze | rg "numpy|dask|pandas"
dask==2.20.0
numpy==1.18.1
pandas==0.25.3

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Aug 15, 2020

Ah thanks for the traceback @ericl!

@ericl
Copy link
Contributor

ericl commented Aug 15, 2020

dask==2.22.0
numpy==1.17.1
numpy-stl==2.10.1
pandas==0.24.2

@clarkzinzow
Copy link
Contributor

It looks like this is a recurring Pandas bug, most recent manifestation: pandas-dev/pandas#31710.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Aug 15, 2020

Also this. It looks like the older bug was fixed for 0.24+, and the more recent bug was fixed for 1.1+.

Edit: Wow, lots of these.

@ericl
Copy link
Contributor

ericl commented Aug 15, 2020

Hmm I still see the issue for 1.1.0. Unless pandas has fixed itself to handle read-only arrays properly in all cases, we might have to add defensive copies for all numpy arrays passed as arguments to tasks.

dask==2.22.0
numpy==1.17.1
numpy-stl==2.10.1
pandas==1.1.0

@ericl
Copy link
Contributor

ericl commented Aug 15, 2020

@stephanie-wang as a workaround you can try this patch:

diff --git a/python/ray/experimental/dask/scheduler.py b/python/ray/experimental/dask/scheduler.py
index b15164525..c738c96c8 100644
--- a/python/ray/experimental/dask/scheduler.py
+++ b/python/ray/experimental/dask/scheduler.py
@@ -1,4 +1,5 @@
 import atexit
+import copy
 from collections import defaultdict
 from multiprocessing.pool import ThreadPool
 import threading
@@ -229,6 +230,7 @@ def dask_task_wrapper(func, repack, *args):
     repacked_args, repacked_deps = repack(args)
     # Recursively execute Dask-inlined tasks.
     actual_args = [_execute_task(a, repacked_deps) for a in repacked_args]
+    actual_args = copy.deepcopy(actual_args)
     # Execute the actual underlying Dask task.
     return func(*actual_args)

@clarkzinzow
Copy link
Contributor

Upgrading to most recent Dask still works, upgrading Pandas breaks it. I'm guessing that this is another case in which Pandas isn't handling read-only arrays properly.

we might have to add defensive copies for all numpy arrays passed as arguments to tasks.

That seems terribly expensive, but until Pandas fixes this, that might be required. Although I'm pretty sure that this issue is exclusive to dataframes, so we'd only have to copy Pandas dataframe arguments; we've hit most of the NumPy surface with the Dask-Ray scheduler and have never hit this error.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Aug 15, 2020

For this particular case, I think that we just need to add const here to get Cython to accept read-only arrays. And if that's the case, const should probably be added to the values argument of all of the other groupby functions in that module.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Aug 15, 2020

And to do that, we'll have to wait for const fused type memory views to be released in Cython.

I'm testing it out with const float64_t[:, :] values just to confirm that this would in fact fix it.

@ericl
Copy link
Contributor

ericl commented Aug 15, 2020

Hmm yeah, if we insert a copy only for pandas dataframes, that doesn't seem so bad (not sure if the copy will add that much overhead in the first place). We can also make it a config flag.

@clarkzinzow
Copy link
Contributor

I tested it out with const float64_t[:, :] and that fixes it. Building Pandas from source takes a good while.

I don't think we'll be seeing const fused type memory views in Cython 0.x anytime soon, so that nature of a fix won't be available in Pandas anytime soon.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Jan 8, 2021

Note that this particular instance of this issue was fixed in Pandas 1.1.2:

Would it be acceptable to close this with the recommendation that anyone encountering this issue on Pandas 1.{0,1}.x upgrade to 1.1.2+?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical
Projects
None yet
Development

No branches or pull requests

3 participants