Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Double Counting and Issues w/Spilling #4186

Open
quasiben opened this issue Oct 26, 2020 · 5 comments
Open

Double Counting and Issues w/Spilling #4186

quasiben opened this issue Oct 26, 2020 · 5 comments

Comments

@quasiben
Copy link
Member

cuDF recently made a change (they are now reverting that change) which impacted the way memory usage is reported. I'm bringing this up to see if my understanding about memory reporting within in Dask is correct, and if so, what we can do to mitigate what I think may be potential double counting in memory reporting for both device and host memory objects.

When Dask calculates the memory of usage of objects we use a set of customized functions to report what that memory usage is. For example, with NumPy arrays we rely on arr.nbytes: https://github.com/dask/dask/blob/81c24bf05ed59394424527900ae60662c3bb8fa1/dask/sizeof.py#L106

In [8]: from dask.sizeof import sizeof

In [9]: import numpy as np

In [10]: arr = np.arange(100)

In [11]: sizeof(arr)
Out[11]: 800

In [12]: sizeof(arr[:50])
Out[12]: 400

This is great! We can accurately measure not only objects but also views of objects. However, when Dask counts how much memory in total is occupied by objects Dask knows about, we may be over estimating how much data we have. Using the example above, if Dask is holding onto the array and also has a slice of the array, Dask would report back 1200 bytes. This is incorrect, we only are using 800 bytes and also have a view of that data. This can also be demonstrated with slicing in a DataFrame map_partitions call: `map_partitions(lambda x: x[:10]) or with repartition:

In [1]: import pandas as pd

In [2]: import dask.dataframe as dd

In [3]: from dask.distributed import Client

In [4]: client = Client(n_workers=2)

In [5]: df = pd.DataFrame({'a': range(10), 'b': range(10,20)})

In [6]: f = client.scatter(df)

In [7]: client.who_has()
Out[7]: {'DataFrame-4d616059e7e4818419c7804e6d99a647': ('tcp://127.0.0.1:51980',)}

In [8]: ddf = dd.from_delayed([f])

In [9]: ddf = ddf.repartition(npartitions=5)

In [10]: ddf = ddf.persist()

In [11]: client.who_has()
Out[11]:
{'DataFrame-4d616059e7e4818419c7804e6d99a647': ('tcp://127.0.0.1:51980',),
 "('repartition-5-e4615e339823f99fe7881147f16b0dd3', 2)": ('tcp://127.0.0.1:51980',),
 "('split-1f1981ef01372b95c6ec8ebd9f5c4f63', 0)": (),
 "('from-delayed-split-1f1981ef01372b95c6ec8ebd9f5c4f63', 0)": (),
 "('repartition-5-e4615e339823f99fe7881147f16b0dd3', 1)": ('tcp://127.0.0.1:51980',),
 "('repartition-5-e4615e339823f99fe7881147f16b0dd3', 4)": ('tcp://127.0.0.1:51980',),
 "('repartition-5-e4615e339823f99fe7881147f16b0dd3', 3)": ('tcp://127.0.0.1:51980',),
 "('repartition-5-e4615e339823f99fe7881147f16b0dd3', 0)": ('tcp://127.0.0.1:51980',)}

So now we have 1 Dataframe and 5 views. How much data Does dask report ?

In [12]: from dask.sizeof import sizeof

In [13]: client.run(lambda dask_worker: sum([sizeof(d) for d in dask_worker.data.values()]))
Out[13]: {'tcp://127.0.0.1:51980': 13104, 'tcp://127.0.0.1:51981': 0

In [14]: client.run(lambda dask_worker: [sizeof(d) for d in dask_worker.data.values()])
Out[14]:
{'tcp://127.0.0.1:51980': [2288, 2160, 2164, 2164, 2164, 2164],
 'tcp://127.0.0.1:51981': []}

But the original data is only 2288 bytes:

In [15]: sizeof(df)
Out[15]: 2288

It's important to note that dask performs two kinds of spilling:

Memory Monitoring is a callback which runs every 200ms. Dask will store data in zict object which can spill whenever some threshold is met. Dask inserts data into the zict after each task. So if we are counting more data than we are actually holding onto, we may be prematurely spilling.

We see the same with dask-cuda/device data. In this case we have a relatively simple sizeof calculation for all objects:

https://github.com/rapidsai/dask-cuda/blob/302d1b8d422dbb981a0e36b1c7f14941cfd80ef7/dask_cuda/device_host_file.py#L37-L38

Dask-cuda will similarly build a zict object for storing data:

https://github.com/rapidsai/dask-cuda/blob/302d1b8d422dbb981a0e36b1c7f14941cfd80ef7/dask_cuda/device_host_file.py#L119-L120

And this is also subject to the same underlying counting issue for Dask generally. However, with device objects these spilling issues can happen earlier since GPU memory is a more limited resource.

It's also important to note that the sizeof calculation is also used to report back how much data we transferred. So if we are moving a view from worker A->worker B, we should only count the amount of data moved and not the size of the original data object.

First, is this a correct representation of what is happening ? If so, this may be a problem which hard to solve as we would need to track when inserting a view of an object we are still holding onto. Would it be reasonable to try and resolve this problem ? Could zict, instead look a process memory usage instead of summing all the object in the zict ?

cc @kkraus14 @pentschev @jakirkham

@kkraus14
Copy link
Member

I want to add an additional case that is maybe problematic, in that in addition to not deduplicating memory, we aren't accurately tracking memory in the case of views. You could imagine code that looks something like:

arr = np.arange(100)
sizeof(arr)  # 800
arr = arr[50:]
sizeof(arr)  # 400

Where we'll only report 400 bytes of memory usage, but in reality we're still using 800 bytes since it's just a view into the original 800 byte allocation that is being reference counted.

If we have the right workload that is very slice heavy this could become quite problematic as we'll drastically under-report memory usage which could lead to memory management problems pretty quickly / easily.

@mrocklin
Copy link
Member

This sounds correct to me.

I had a brief conversation with @kkraus14 in which I recommended that you stop relying on per-object memory counting, and just rely on the device to tell you how much memory is used. @kkraus14 then told me that this is quite hard/expensive to do, and that per-object tracking is best.

In principle, it is hard to ask Python "How much do all of these things take up in memory together". I'm totally open to a solution if one exists.

@madsbk
Copy link
Contributor

madsbk commented Jan 27, 2021

FYI: we have addressed this issue in Dask-CUDA by implementing spilling of individual CUDA objects: rapidsai/dask-cuda#451. It might be possible to handle main memory objects like NumPy arrays similarly.

@jakirkham
Copy link
Member

Yeah I think this would be interesting to upstream if we could. Then Dask-CUDA could just plug in to whatever architecture we have in Distributed. Do you have a sense of what this would take to do Mads? 🙂

@madsbk
Copy link
Contributor

madsbk commented Jan 29, 2021

Do you have a sense of what this would take to do Mads?

I don't think it is too much work to generalize the solution to work with any kind of object and support spilling to disk. Do you think that the Dask community can accept the limitations of an object wrapper like ProxyObject?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants