Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tackle "ValueError: buffer source array is read-only" #1978

Closed
crusaderky opened this issue May 12, 2018 · 28 comments · Fixed by #3967
Closed

Tackle "ValueError: buffer source array is read-only" #1978

crusaderky opened this issue May 12, 2018 · 28 comments · Fixed by #3967

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented May 12, 2018

One of the biggest pitfalls of running dask.array with a distributed scheduler is the dreaded ValueError: buffer source array is read-only.
This error is typical when one runs a memoryview-based Cython kernel in distributed, and it's particularly insidious as it will never show up in unit tests performed with the local dask multithreaded scheduler. It might even not show up when you run your problem on distributed and the arrays just happen to never transit across nodes or to the disk cache (which is exactly what the scheduler will try to achieve if enough RAM and CPU power are available).

In other words, this is a textbook example of an issue that risks appearing in production for the first time!

I can see a few ways of tackling the problem:

  • in Cython: if you ain't writing to an array, it should just work. As this issue has been around for the longest time, I suspect it might not be trivial?
  • in distributed, making sure that all arrays passed to all kernels are writeable
  • in dask.array, making sure that all arrays passed to all kernels are NOT writeable, which actually makes a lot of sense regardless of distributed. This will make the error crop up immediately in any naive unit tests. It will also wreak havoc for many existing dask.array users though. Possibly a opt-in setting?
  • in the distributed docs, with a thorough tutorial on how to reproduce the problem in unit testing and how to change your kernels to fix it, so that it becomes the first result when anybody googles the exception.

On the last point, I personally solved the problem as follows:

In the kernels:

def _memoryview_safe(x):
    """Make array safe to run in a Cython memoryview-based kernel. These
    kernels typically break down with the error ``ValueError: buffer source
    array is read-only`` when running in dask distributed.
    """
    if not x.flags.writeable:
        if not x.flags.owndata:
            x = x.copy(order='C')
        x.setflags(write=True)
    return x


def splev(x_new, t, c, k=3, extrapolate=True):
    x_new = _memoryview_safe(x_new)
    t = _memoryview_safe(t)
    c = _memoryview_safe(c)
    spline = scipy.interpolate.BSpline.construct_fast(t, c, k, axis=0, extrapolate=extrapolate)
    return spline(x_new)

In the unit test:

def test_distributed():
    def ro_array(a):
        a = np.array(a)
        a.setflags(write=False)
        # Return a view of a, so that setting the write flag on the view is not enough
        return a[:]

    t = ro_array([1, 2])
    c = ro_array([10, 20])
    x_new = ro_array([1.5, 1.8])
    splev(x_new, t, c, k=1)

If you comment out any of those calls to _memoryview_safe, the test falls over.
Above I'm calling the kernel directly, but a similar thing can also be invoked from the dask wrapper (probably a more robust design).

@mrocklin
Copy link
Member

mrocklin commented May 13, 2018 via email

@shoyer
Copy link
Member

shoyer commented May 14, 2018

Support for read-only memoryviews was only recently added into Cython: cython/cython#1869. If I understand that PR correctly, this error should no longer be raised if the Cythonized function does not try to override any data.

So fixing this may be as simple as recompiling all your dependencies with the latest version of Cython.

@jakirkham
Copy link
Member

Thanks for raising this @crusaderky. Sorry for the slow reply.

Agree this is not a fun issue and it needs a fix.

  • in Cython: if you ain't writing to an array, it should just work. As this issue has been around for the longest time, I suspect it might not be trivial?

We can't do too much about how Cython behaves. Though it sounds like they have solved this in 0.28+, which is good to hear. That said, this can be a problem in vanilla Python if an assignment occurs under the hood in some wrapped function. IOW let's ignore the Cython context for a moment.

  • in distributed, making sure that all arrays passed to all kernels are writeable

  • in dask.array, making sure that all arrays passed to all kernels are NOT writeable, which actually makes a lot of sense regardless of distributed. This will make the error crop up immediately in any naive unit tests. It will also wreak havoc for many existing dask.array users though. Possibly a opt-in setting?

These seem like the most reasonable options IMHO. We could discuss documenting it more (and certainly that could be reasonable while we figure out how to solve this), but that is ultimately a workaround when we want a fix.

First and foremost, this is a consistency problem. So can see value in both solutions just by making things more consistent.

Second the value of Dask IMHO is that it provides a dataflow model. As such, it doesn't really make sense to write to some upstream array in the graph because it breaks this model or "goes against the flow" (couldn't resist 😄). IOW would consider the fact Dask Array allows this currently a bug and one needing a fix. So would be inclined to start forcing arrays in the Dask Array graph to be read-only.

To summarize, we need consistency in this area across schedulers. Having read-only arrays is reasonable in Dask's dataflow model. So making all arrays in Dask read-only would be a reasonable fix to this problem.

@jakirkham
Copy link
Member

Issue ( dask/dask#3674 ) appears somewhat related.

@lesteve
Copy link
Member

lesteve commented Jun 27, 2018

Support for read-only memoryviews was only recently added into Cython: cython/cython#1869. If I understand that PR correctly, this error should no longer be raised if the Cythonized function does not try to override any data.

Just a quick comment on this since I was involved in providing feed-back about the Cython PR (this read-only problem happens quite often too in a scikit-learn context, or more precisely in a joblib context which automatically memmaps inputs in read-only mode, and we were quite interested by the functionality). To benefit from the cython feature you need to add a const in your cython function signature along these lines:

cpdef func_that_can_take_read_only_array(const double[:] input_array):
    ...

There is a limitation of const memoryview at the moment: you can not use const memoryview with fused types, see cython/cython#1772 for more details. As far as scikit-learn is concerned this is the main reason we have not moved to using const memoryviews.

@alimanfoo
Copy link

I've just encountered this working with scikit-allel (cggh/scikit-allel#206). The function that causes this uses a fused type, so +1 for cython supporting const on fused types. Thanks @crusaderky for the workaround.

@indrajitsg
Copy link

indrajitsg commented Dec 18, 2018

I am facing this issue at times when running dask-ml PCA! Sometimes it runs smoothly, sometimes it throws this error. This is the code that I am submitting:

import dask.array as da
from dask.distributed import Client
from dask_ml.decomposition import PCA

client = Client('XXX.XXX.XXX.XXX:8786')

x_train = da.concatenate([x1, x2, x3], axis = 1)

print(x_train.shape)

# (10000, 87808)

pca = PCA(n_components=900, random_state=87350)
x_pca = pca.fit_transform(x_train)

This is the output that I get:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<timed exec> in <module>

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/dask_ml/decomposition/pca.py in fit_transform(self, X, y)
    361         """
    362         # X = check_array(X)
--> 363         U, S, V = self._fit(X)
    364         U = U[:, : self.n_components_]
    365 

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/dask_ml/decomposition/pca.py in _fit(self, X)
    304             singular_values,
    305             noise_variance,
--> 306             singular_values,
    307         )
    308 

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    395     keys = [x.__dask_keys__() for x in collections]
    396     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397     results = schedule(dsk, keys, **kwargs)
    398     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    399 

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2316             try:
   2317                 results = self.gather(packed, asynchronous=asynchronous,
-> 2318                                       direct=direct)
   2319             finally:
   2320                 for f in futures.values():

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1650             return self.sync(self._gather, futures, errors=errors,
   1651                              direct=direct, local_worker=local_worker,
-> 1652                              asynchronous=asynchronous)
   1653 
   1654     @gen.coroutine

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    668             return future
    669         else:
--> 670             return sync(self.loop, func, *args, **kwargs)
    671 
    672     def __repr__(self):

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1495                             six.reraise(type(exception),
   1496                                         exception,
-> 1497                                         traceback)
   1498                     if errors == 'skip':
   1499                         bad_keys.add(key)

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/sklearn/utils/extmath.py in svd_flip()
    569         max_abs_cols = np.argmax(np.abs(u), axis=0)
    570         signs = np.sign(u[max_abs_cols, xrange(u.shape[1])])
--> 571         u *= signs
    572         v *= signs[:, np.newaxis]
    573     else:

ValueError: output array is read-only

@birdsarah
Copy link
Contributor

I just ran into this for the first time. I was running on a fresh environment with the latest dask (1.2.2) and (1.28.0). As I hadn't seen it before, I downgraded to dask (1.2.0) and distributed (1.27.0) and the issue stopped.

@jcrist I was wondering if maybe it was to do with the upgrade to series.isin, but maybe it's just a coincidence. Anyway, I thought I would report just in case. My code was very simple:

  • read in a dataframe
  • set dtypes
  • filter
  • repartition
  • save

@jcrist
Copy link
Member

jcrist commented May 13, 2019

@birdsarah if you could provide a reproducible example causing this issue that would be much appreciated.

@birdsarah
Copy link
Contributor

I'm afraid I don't have time to work up a minimum reproducible example and I can't share my data right now. Sorry.

@demaheim
Copy link

demaheim commented Dec 12, 2019

So fixing this may be as simple as recompiling all your dependencies with the latest version of Cython.

I have no experience with Cython. How can I do this?

@TomAugspurger
Copy link
Member

TomAugspurger commented Dec 12, 2019 via email

@demaheim
Copy link

To check if using the latest version of Cython fixes my error, I used

cython                    0.29.14

and cloned my dependencies

  • dask
  • dask-ml
  • dask-jobqueue

In each dependency I ran

python setup.py clean
python setup.py build_ext
python setup.py install

but I still get ValueError: output array is read-only.

@lesteve
Copy link
Member

lesteve commented Dec 19, 2019

About the PCA issue, I would guess it is unlikely that this one is related to Cython. The line that the traceback points to is u *= signs so a simple numpy modification of an array rather than a call to a function defined in a Cython file.

~/anaconda3/envs/pytorch041/lib/python3.6/site-packages/sklearn/utils/extmath.py in svd_flip()
    569         max_abs_cols = np.argmax(np.abs(u), axis=0)
    570         signs = np.sign(u[max_abs_cols, xrange(u.shape[1])])
--> 571         u *= signs
    572         v *= signs[:, np.newaxis]
    573     else:

ValueError: output array is read-only

If I try to sum up my understanding of this issue:

  1. dask-ml PCA as a delayed(sklearn.utils.extmath.svd_flip): https://github.com/dask/dask-ml/blob/6c7ff0e7bd5f45e596d0b371cd893ec87e495cb8/dask_ml/utils.py#L27
  2. sklearn.utils.extmath.svd_flip does a inplace modification of its inputs
  3. sometimes when using distributed you get read-only arrays. That is the part I am a bit fuzzy about. @crusaderky's seeems to be saying in the top post that these read-only arrays don't happen in most cases (wild guess maybe related to work-stealing?):

It might even not show up when you run your problem on distributed and the arrays just happen to never transit across nodes or to the disk cache (which is exactly what the scheduler will try to achieve if enough RAM and CPU power are available).

Clarifications on point 3. by dask developers could potentially make it easier to reproduce the problem with a LocalCluster.

@JSKenyon
Copy link

JSKenyon commented Feb 24, 2020

Just to confirm, is this still an issue? I believe I am running into it using a combination of blockwise and vanilla Python when mutating an array of data flags. I would prefer not to mutate the array but it is so large that making copies is not really feasible. I am using a LocalCluster and the moment I have n_workers > 1 there is a chance that it bombs out with ValueError: assignment destination is read-only. Of course, as @crusaderky points out, this is not always the case. Currently I am circumventing the problem by copying only if necessary but the array/s in question may be 10s of GB. On bad runs, this is a massive performance hit.

@crusaderky
Copy link
Collaborator Author

Reproduced with stack as of May 2018; cannot reproduce with latest stack as of Feb 2020. Note how downgrading Cython was not enough to reproduce the issue; I did not investigate which package/version fixed the problem exactly.

POC

demo.pyx

import numpy


cpdef f(double[:] x):
    return numpy.array(x)


cpdef g(const double[:] x):
    return numpy.array(x)

main.py

import dask
import dask.array as da
import dask.threaded
import distributed
import numpy
import pyximport

pyximport.install()
from demo import f, g


def main():
    a1 = da.ones(4, chunks=4)
    a2 = da.from_array(numpy.ones(4), chunks=4)
    client = distributed.Client()

    for scheduler in ('threads', 'distributed'):
        if dask.__version__ < '2':
            kwargs = {"get": client.get if scheduler == "distributed" else dask.threaded.get}
        else:
            kwargs = {"scheduler": scheduler}

        for a in (a1, a2):
            for func in (f, g):
                try:
                    b = a.map_blocks(func, dtype=a.dtype).compute(**kwargs)
                    assert b.tolist() == [1, 1, 1, 1]
                    out = "OK"
                except Exception as e:
                    out = f"{type(e).__name__}: {e}"

                print(f"{scheduler}, {func.__name__}, {a.name.split('-')[0]}: {out}")


if __name__ == "__main__":
    main()

With legacy stack

$ conda create -n legacy python=3.6 cython=0.28.1 distributed=1.21.1 dask=0.17.3 numpy=1.14.3 tornado=5.0.2 clang_osx-64
$ conda activate legacy
$ python main.py 2>/dev/null
threads, f, wrapped: OK
threads, g, wrapped: OK
threads, f, array: OK
threads, g, array: OK
distributed, f, wrapped: OK
distributed, g, wrapped: OK
distributed, f, array: ValueError: buffer source array is read-only
distributed, g, array: OK

With latest stack

$ conda create -n latest python=3.6 cython dask distributed clang_osx-64
$ conda activate latest
$ python main.py 2>/dev/null
threads, f, ones: OK
threads, g, ones: OK
threads, f, array: OK
threads, g, array: OK
distributed, f, ones: OK
distributed, g, ones: OK
distributed, f, array: OK
distributed, g, array: OK

@mrocklin
Copy link
Member

mrocklin commented Mar 1, 2020 via email

@astrojuanlu
Copy link

I observed this error again today with latest versions of everything. I'm following this old tutorial by @mrocklin and doing something like:

df = dd.read_csv("../data/yellow_tripdata_2019-*.csv", parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"])
payment_types = {
    1: "Credit Card",
    2: "Cash",
    3: "No Charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}
payment_names = pd.Series(
    payment_types, name="payment_name"
).to_frame()
df2 = df.merge(
    payment_names, left_on="payment_type", right_index=True
)
op = df2.groupby("payment_name")["tip_amount"].mean()
client.compute(op2)

Interestingly, when I use 2 .csv files everything works fine, but if I load 4 of them, I get this error:

distributed.worker - WARNING -  Compute Failed
Function:  _apply_chunk
args:      (        VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  RatecodeID store_and_fwd_flag  ...  mta_tax  tip_amount  tolls_amount  improvement_surcharge  total_amount  congestion_surcharge  payment_name
0              1  2019-01-04 14:08:46   2019-01-04 14:18:10                1           1.70           1                  N  ...      0.5         0.0          0.00                    0.3          9.30                   NaN          Cash
1              1  2019-01-04 14:20:33   2019-01-04 14:25:10                1           0.90           1                  N  ...      0.5         0.0          0.00                    0.3          6.30                   NaN          Cash
13             2  2019-01-04 14:14:45   2019-01-04 14:26:00                5           1.63           1                  N  ...      0.5         0.0          0.00                    0.3          9.80                   NaN          Cash
15             2  2019-01-04 14:49:45   2019-01-04 15:0
kwargs:    {'chunk': <methodcaller: sum>, 'columns': 'tip_amount'}
Exception: ValueError('buffer source array is read-only')

and this traceback when trying to retrieve the result of the future:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-29-0f07712721c0> in <module>
----> 1 _27.result()

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/distributed/client.py in result(self, timeout)
    221         if self.status == "error":
    222             typ, exc, tb = result
--> 223             raise exc.with_traceback(tb)
    224         elif self.status == "cancelled":
    225             raise result

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/dask/dataframe/groupby.py in _apply_chunk()
    293         if isinstance(columns, (tuple, list, set, pd.Index)):
    294             columns = list(columns)
--> 295         return func(g[columns], **kwargs)
    296 
    297 

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/dask/utils.py in __call__()
    893 
    894     def __call__(self, obj, *args, **kwargs):
--> 895         return getattr(obj, self.method)(*args, **kwargs)
    896 
    897     def __reduce__(self):

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/pandas/core/groupby/groupby.py in f()
   1370                 # try a cython aggregation if we can
   1371                 try:
-> 1372                     return self._cython_agg_general(alias, alt=npfunc, **kwargs)
   1373                 except DataError:
   1374                     pass

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/pandas/core/groupby/groupby.py in _cython_agg_general()
    888                 continue
    889 
--> 890             result, agg_names = self.grouper.aggregate(
    891                 obj._values, how, min_count=min_count
    892             )

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/pandas/core/groupby/ops.py in aggregate()
    584         self, values, how: str, axis: int = 0, min_count: int = -1
    585     ) -> Tuple[np.ndarray, Optional[List[str]]]:
--> 586         return self._cython_operation(
    587             "aggregate", values, how, axis, min_count=min_count
    588         )

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/pandas/core/groupby/ops.py in _cython_operation()
    527             )
    528             counts = np.zeros(self.ngroups, dtype=np.int64)
--> 529             result = self._aggregate(
    530                 result, counts, values, codes, func, is_datetimelike, min_count
    531             )

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/pandas/core/groupby/ops.py in _aggregate()
    606             agg_func(result, counts, values, comp_ids, rank=1, min_count=-1)
    607         else:
--> 608             agg_func(result, counts, values, comp_ids, min_count)
    609 
    610         return result

pandas/_libs/groupby.pyx in pandas._libs.groupby._group_add()

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/pandas/_libs/groupby.cpython-38-x86_64-linux-gnu.so in View.MemoryView.memoryview_cwrapper()

~/.pyenv/versions/3.8.3/envs/dask38/lib/python3.8/site-packages/pandas/_libs/groupby.cpython-38-x86_64-linux-gnu.so in View.MemoryView.memoryview.__cinit__()

ValueError: buffer source array is read-only

Linux Mint 19.3, Python 3.8.5, fresh pyenv with pip install dask[complete] matplotlib graphviz ipywidgets notebook "bokeh<2.1".

@TomAugspurger
Copy link
Member

@astrojuanlu it's possible that's a separate issue. Pandas has had a decent number of issues (pandas-dev/pandas#34857 and linked issues) where arrays end up being readonly and then blow up when they get to a Cython routine that doesn't expect them.

Or it could be the same issue reported here.

@astrojuanlu
Copy link

Thanks for the blazing fast response @TomAugspurger! I will keep an eye on this.

@jakirkham
Copy link
Member

jakirkham commented Jun 19, 2020

At this point, we are allocating bytearrays for received data, which support reading and writing. However things like merge_frames and decompress could be constructing bytes objects, which it gives to Pandas (or other libraries). Note this is a non-exhaustive list of where bytes might come in during serialization. Just trying to point out there are places where bytes object could show up when receiving data and bytes objects by their nature are readonly buffers. To address this, we would either need to eat a copy in multiple places or ask the user to copy to some writable buffer.

@djuarezr
Copy link

It seems to work with python 3.7 for me with my own data. I tried latest pandas (1.0.5) and dask (2.19) in both python 3.7 and python 3.8. The error happens just with the python 3.8 version.

This test was done with a Windows 7 and a Windows 10 machine working in parallel.

Hope this helps.

@jakirkham
Copy link
Member

Could someone please file a new issue with an MRE?

@jakirkham
Copy link
Member

jakirkham commented Jun 22, 2020

My guess is we will want something like PR ( #3918 ).

Edit: Would be good if some people could test it and report back over there 🙂

@jakirkham
Copy link
Member

That said, I've not managed to get a reproducer together. So am unable to test or develop on this further. @astrojuanlu or @djuarezr, if you are able to supply a reproducer, that would really help us get to some fix here quickly 🙂

@astrojuanlu
Copy link

My attempt to submit a reproducer at #3943, hope it helps!

@astrojuanlu
Copy link

Upstream pandas-dev/pandas#34857 was fixed

@jakirkham
Copy link
Member

Interesting, well it shouldn't be an issue for Dask in either case as we track which buffers are writable and ensure that is preserved when reconstituting objects in deserialization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.