Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Individual CUDA object spilling #451

Merged
merged 64 commits into from
Jan 6, 2021

Conversation

madsbk
Copy link
Member

@madsbk madsbk commented Dec 1, 2020

This PR introduces a new device host file that uses ProxyObejct to implement spilling of individual CUDA objects as opposed to the current host file, which spills entire keys.

  • Implement spilling of individual objects
  • Handle task level aliasing
  • Handle shared device buffers
  • Write docs

To use, set DASK_JIT_UNSPILL=True

Motivation

Aliases at the task level

Consider the following two tasks:

def task1():  # Create list of dataframes
    df1 = cudf.DataFrame({"a": range(10)})
    df2 = cudf.DataFrame({"a": range(10)})
    return [df1, df2]

def task2(dfs):  # Get the second item
    return dfs[1]    

Running the two task on a worker we get something like:

>>> data["k1"] = task1()
>>> data["k2"] = task2(data["k1"])
>>> data
{
    "k1": [df1, df2],
    "k2": df2,
}

Since the current implementation of spilling works on keys and handles each keys separately, it overestimate the device memory used: sizeof(df)*3. But even worse, if it decides to spill k2 no device memory is freed since k1 still holds a reference to df2!

The new spilling implementation fixes this issue by wrapping identical CUDA objects in a shared ProxyObejct thus in this case df2 in both k1 and k2 will refer to the same ProxyObejct.

Sharing device buffers

Consider the following code snippet:

>>> data["df"] = cudf.DataFrame({"a": range(10)})
>>> data["grouped"] = shuffle_group(data["df"], "a", 0, 2, 2, False, 2)
>>> data["v1"] = data["grouped"][0]
>>> data["v2"] = data["grouped"][1]

In this case v1 and v2 are separate objects and are handled separately both in the current and the new spilling implementation. However, the shuffle_group() in cudf actually returns a single device memory buffer such that v1 and v2 points to the same underlying memory buffer. Thus the current implement will again overestimate the memory use and spill one of the dataframes without any effect.
The new implementation takes this into account when estimating memory usage and make sure that either both dataframes are spilled or none of them are.

cc. @beckernick, @VibhuJawa
xref: dask/distributed#3756

@madsbk madsbk changed the title Individual CUDA object spilling [WIP] Individual CUDA object spilling Dec 1, 2020
@madsbk madsbk added the 2 - In Progress Currently a work in progress label Dec 1, 2020
@beckernick
Copy link
Member

beckernick commented Dec 1, 2020

From our initial tests, this works exactly as we had all hoped. Memory is well behaved, and the performance impact is significant. Query results also pass the correctness checks.

Setup:

  • 8 GPUs of a DGX-2 (GPUs 0-7)
  • DEVICE_MEMORY_LIMIT="15GB"
  • POOL_SIZE="30GB"
  • TCP communication
  • SF1K data
  • Reading parquet files comprising 2GB in-memory data chunks from the local /raid of the DGX-2

Q02 Standard: 300 seconds
Q02 Object Spilling: 85 seconds

Q03 Standard: 295 seconds
Q03 Object Spilling: 100 seconds

Q04 Standard: 305 seconds
Q04 Object Spilling: 90 seconds

cc @quasiben @kkraus14

EDIT: Will be doing a full sweep of the queries

@quasiben
Copy link
Member

quasiben commented Dec 1, 2020

Those are significant improvements!

@beckernick
Copy link
Member

Some queries are failing during equality comparisons (ProxyObjects don't support equality ops). Looks like there's several other errors, but will need to distinguish between them and other issues.

For queries with which this succeeds, it's fantastic.

@beckernick
Copy link
Member

beckernick commented Dec 1, 2020

There are no failures in the standard environment. Will be using this comment to track object spilling failures with @madsbk . These are not meant to be the full tracebacks -- just a log.

Q01, Q07
TypeError: '<' not supported between instances of 'ProxyObject' and 'ProxyObject'
TypeError: '>' not supported between instances of 'ProxyObject' and 'ProxyObject'
Likely coming from the less than operation in the custom task https://github.com/rapidsai/gpu-bdb/blob/1dc201bdb4542213df265c87b21e8e989291cacc/tpcx_bb/queries/q01/tpcx_bb_query_01.py#L78

Q05
TypeError: Implicit conversion to a host NumPy array via array is not allowed, To explicitly construct a GPU array, consider using cupy.asarray(...)
To explicitly construct a host array, consider using .to_array()
Likely somewhere in build_and_predict_model: https://github.com/rapidsai/gpu-bdb/blob/main/tpcx_bb/queries/q05/tpcx_bb_query_05.py#L75

Q16
distributed.protocol.pickle - INFO - Failed to serialize ([['i_item_id', <dask_cuda.proxy_object.ProxyObject at 0x7f1cc6e1afa0 of cudf.core.series.Series at 0x7f1cc6e13e60>]],). Exception: args[0] from newobj args has the wrong class
Encountered Exception while running query
distributed.protocol.pickle - INFO - Failed to serialize (subgraph_callable, "('read-parquet-5a6657fa560358b3b3932b8954051ef0', 0)", 'w_state_code', (subgraph_callable, (subgraph_callable, (subgraph_callable, "('read-parquet-5a6657fa560358b3b3932b8954051ef0', 0)", ['w_state']), {'w_state': <dask_cuda.proxy_object.ProxyObject at 0x7f1cc6e69730 of cudf.core.series.Series at 0x7f1cc6e697d0>}, None, 'getitem-6cdeec5813df85c43710a7d82d0884b6'), 'w_state')). Exception: args[0] from newobj args has the wrong class
Traceback (most recent call last):
File "/raid/nicholasb/miniconda3/envs/rapids-tpcxbb-20201201-spill/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 49, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: args[0] from newobj args has the wrong class

Q15, Q17
File "/raid/nicholasb/miniconda3/envs/rapids-tpcxbb-20201201-spill/lib/python3.7/site-packages/dask_cuda/proxy_object.py", line 256, in getattr
return getattr(self._obj_pxy_deserialize(), name)
AttributeError: 'tuple' object has no attribute 'copy'

Q21
This query appears to finish but then just before the final ops it goes back and repeats the second half of the shuffle phase and all associated work. Repeatedly (forever)

Q30
f"{obj.class.name} object is not iterable. "
TypeError: MultiIndex object is not iterable. Consider using .to_arrow(), .to_pandas() or .values_host if you wish to iterate over the values.

Q12
File "/raid/nicholasb/miniconda3/envs/rapids-tpcxbb-20201201-spill/lib/python3.7/site-packages/cudf/core/column_accessor.py", line 267, in _select_by_label_grouped
result = self._grouped_data[key]
KeyError: False

Q23, Q24
df.index = df[index_cols].copy(deep=False)
AttributeError: 'ProxyObject' object has no attribute 'index'

Q29
File "tpcx_bb_query_29.py", line 139, in main
grouped_df.columns = ["category_id_1", "category_id_2", "cnt"]
AttributeError: 'ProxyObject' object has no attribute 'columns'

Q10, Q18, Q19, Q27
File "cudf/_lib/merge.pyx", line 36, in cudf._lib.merge.merge_sorted
TypeError: Cannot convert ProxyObject to cudf._lib.table.Table

Q28
File "<array_function internals>", line 6, in concatenate
ValueError: object array method not producing an array

Q20, Q25, Q26 (Fixed by dask/dask#6927)
TypeError: Expected meta to specify scalar, got cudf.core.series.Series

@jakirkham
Copy link
Member

My guess is ProxyObjects will need to convert themselves back to real objects when performing those comparisons.

@madsbk madsbk added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Dec 2, 2020
@madsbk madsbk force-pushed the object_spilling branch 3 times, most recently from e18c3ef to 159fa02 Compare December 2, 2020 20:35
@jakirkham
Copy link
Member

My guess is ProxyObjects will need to convert themselves back to real objects when performing those comparisons.

Done in PR ( #458 )

@codecov-io
Copy link

codecov-io commented Dec 2, 2020

Codecov Report

Merging #451 (81668b1) into branch-0.18 (b170b29) will increase coverage by 0.73%.
The diff coverage is 94.26%.

Impacted file tree graph

@@               Coverage Diff               @@
##           branch-0.18     #451      +/-   ##
===============================================
+ Coverage        90.40%   91.14%   +0.73%     
===============================================
  Files               15       18       +3     
  Lines             1126     1446     +320     
===============================================
+ Hits              1018     1318     +300     
- Misses             108      128      +20     
Impacted Files Coverage Δ
dask_cuda/cli/dask_cuda_worker.py 96.92% <ø> (+0.09%) ⬆️
dask_cuda/device_host_file.py 90.69% <66.66%> (-8.17%) ⬇️
dask_cuda/cuda_worker.py 76.66% <75.00%> (-0.35%) ⬇️
dask_cuda/proxify_device_objects.py 85.71% <85.71%> (ø)
dask_cuda/get_device_memory_objects.py 89.04% <89.04%> (ø)
dask_cuda/proxy_object.py 90.62% <95.91%> (+2.82%) ⬆️
dask_cuda/local_cuda_cluster.py 81.17% <100.00%> (+0.68%) ⬆️
dask_cuda/proxify_host_file.py 100.00% <100.00%> (ø)
... and 3 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b170b29...81668b1. Read the comment docs.

@madsbk
Copy link
Member Author

madsbk commented Dec 3, 2020

@beckernick, I have updated your error log. Q01, q05, q07 should now work. Please re-add them if they still fails for you

@beckernick
Copy link
Member

beckernick commented Dec 3, 2020

👍 I see this also. Just for tracking sake, I'm going to re-do your comment edits as strikethrough

EDIT: Updated the tracking comment to reflect new successes as well and changed errors

@madsbk madsbk mentioned this pull request Dec 3, 2020
2 tasks
@madsbk madsbk force-pushed the object_spilling branch 9 times, most recently from 93bf3e1 to 4c16d5e Compare December 14, 2020 08:26
@madsbk madsbk changed the base branch from branch-0.17 to branch-0.18 December 14, 2020 08:26
madsbk added a commit to madsbk/dask-cuda that referenced this pull request Dec 16, 2020
@madsbk
Copy link
Member Author

madsbk commented Dec 16, 2020

@beckernick, all queries seems to work now. Can you confirm that they also works for you?
Notice, you need dask-master from today, which include dask/dask#6927 or dask/dask#6981 to support q28

@madsbk madsbk added the 3 - Ready for Review Ready for review by team label Dec 17, 2020
@madsbk madsbk changed the title [WIP] Individual CUDA object spilling Individual CUDA object spilling Dec 17, 2020
@madsbk
Copy link
Member Author

madsbk commented Dec 21, 2020

Test and evaluation of NVTabular's Criteo/DLRM Preprocessing Benchmark

Running a 10-days data set on a on DXG-2 everything works and achieve a 1.48 times speedup. In order to force the use of device-to-host memory spilling, I had to set the device limit to 6.5GB.

Memory Peak Runtime Speedup
Old style spilling 20 GB 408 sec 1
JIT unspill 17 GB 276 sec 1.48

The commands, which depend on Dask master:

# Old style spilling
DASK_JIT_UNSPILL=False python NVTabular/examples/dask-nvtabular-criteo-benchmark.py --data-path /datasets/criteo/crit_orig_pq_10days --out-path output -d "8,9,10,11,12,13,14,15" --cats-on-device --cat-cache-high device --device-limit-frac 0.2

# JIT unspill
DASK_JIT_UNSPILL=True  python NVTabular/examples/dask-nvtabular-criteo-benchmark.py --data-path /datasets/criteo/crit_orig_pq_10days --out-path output -d "8,9,10,11,12,13,14,15" --cats-on-device --cat-cache-high device --device-limit-frac 0.2

Based on the results from the NVTabular and TPCx-BB workflows, I think this PR is ready for reviews.

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a high-level, I think this PR looks good, I have a few proposed changes and questions below.

Overall the PR is very complex and difficult to keep track of all the important details when reviewing, so in all honesty I couldn't mentally validate if all the connections between different classes and various data types registrations are indeed correct, I think the best approach here is to indeed have this as an experimental feature (as currently proposed) for some type to allow people to validate the system functionality.

Thanks @madsbk for the huge effort you put into this problem!

{
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that there's no memory_limit here seems to point that there's no capability for host<->disk spilling, is that intended? We also see to be missing local_directory, which will prevent users from running storing things anywhere but the current directory, that seems problematic for certain use cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I now see you added a comment about memory_limit in LocalCUDACluster docs. Can you do the same in

@click.option(
"--enable-jit-unspill/--disable-jit-unspill",
default=None, # If not specified, use Dask config
help="Enable just-in-time unspilling",
?

The local_directory question is still valid though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in this PR we will not support any spilling to disk. I have removed memory_limit and local_directory to make this clear. I don't think local_directory is used for anything else than disk spilling?

I will be up to a future PR to implement disk spilling, which shouldn't be too difficult.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that local_directory was used for more than just spilling, so I went on a hunt. To be fair, it's still unclear to me even where in the code local_directory is really used, I found one other case where it's used though, when you upload files, see for example https://github.com/dask/distributed/blob/607cfd2ce00edd44c99da3273de0763a426dda7d/distributed/tests/test_worker.py#L176-L202 . That isn't to say this is the only other case, but I couldn't confirm nor deny whether there are other cases besides spilling and file uploading. Maybe @quasiben or @jakirkham would know.

a = proxy_object.asproxy(org.copy())
b = proxy_object.asproxy(org.copy())
res1 = tensordot_lookup(a, b).flatten()
res2 = tensordot_lookup(org.copy(), org.copy()).flatten()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you're using the tensordot_lookup and einsum_lookup below. Could we test instead np.tensordot and np.einsum, as users would call it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to dask.array.tensordot() and dask.array.einsum()

assert k2._obj_pxy_serialized()
assert not dhf["k4"]._obj_pxy_serialized()

# Deleting k2 does change anything since k3 still holds a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to write "Deleting k2 does NOT change anything" ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have change the test to use dhf.proxies_tally for the check.

dask_cuda/tests/test_proxify_host_file.py Outdated Show resolved Hide resolved
dask_cuda/proxy_object.py Outdated Show resolved Hide resolved
dask_cuda/proxify_device_objects.py Outdated Show resolved Hide resolved
dask_cuda/proxify_device_objects.py Outdated Show resolved Hide resolved
dask_cuda/proxify_host_file.py Outdated Show resolved Hide resolved
dask_cuda/get_device_memory_objects.py Outdated Show resolved Hide resolved
dask_cuda/proxify_host_file.py Outdated Show resolved Hide resolved
@madsbk
Copy link
Member Author

madsbk commented Jan 5, 2021

@pentschev thanks for the review, much appreciated. I have addressed all of your suggestions, I think :)

@pentschev
Copy link
Member

Thanks @madsbk for addressing those. It seems that failing tests are legit, all three seem to fail with similar errors:

dask_cuda/tests/test_proxy.py::test_proxy_object_parquet *** stack smashing detected ***: <unknown> terminated
Fatal Python error: Aborted

@madsbk
Copy link
Member Author

madsbk commented Jan 5, 2021

Thanks @madsbk for addressing those. It seems that failing tests are legit, all three seem to fail with similar errors:

I think it is a cuDF bug: rapidsai/cudf#7074
Changed test_proxy_object_parquet() to use the pyarrow engine

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking the failing tests @madsbk . To the extent I was capable of verifying this PR, everything looks good to me now. I'm approving but will wait until tomorrow for merging to give others a chance to review it as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improvement / enhancement to an existing function non-breaking Non-breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants