Skip to content

Commit

Permalink
[EM] Add tests for irregular data shapes. (#10980)
Browse files Browse the repository at this point in the history
- More tests.
- Recommend arena in the document.
  • Loading branch information
trivialfis authored Nov 5, 2024
1 parent 3b8f432 commit ccc5f05
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 18 deletions.
3 changes: 2 additions & 1 deletion demo/dask/forward_logging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Example of forwarding evaluation logs to the client
"""
Example of forwarding evaluation logs to the client
===================================================
The example runs on GPU. Two classes are defined to show how to use Dask builtins to
Expand Down
22 changes: 17 additions & 5 deletions demo/guide-python/distributed_extmem_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
If `device` is `cuda`, following are also needed:
- cupy
- python-cuda
- rmm
"""
Expand Down Expand Up @@ -104,11 +105,22 @@ def setup_rmm() -> None:
if not xgboost.build_info()["USE_RMM"]:
return

# The combination of pool and async is by design. As XGBoost needs to allocate large
# pages repeatly, it's not easy to handle fragmentation. We can use more experiments
# here.
mr = rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource())
rmm.mr.set_current_device_resource(mr)
try:
from cuda import cudart
from rmm.mr import ArenaMemoryResource

status, free, total = cudart.cudaMemGetInfo()
if status != cudart.cudaError_t.cudaSuccess:
raise RuntimeError(cudart.cudaGetErrorString(status))

mr = rmm.mr.CudaMemoryResource()
mr = ArenaMemoryResource(mr, arena_size=int(total * 0.9))
except ImportError:
# The combination of pool and async is by design. As XGBoost needs to allocate
# large pages repeatly, it's not easy to handle fragmentation. We can use more
# experiments here.
mr = rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource())
rmm.mr.set_current_device_resource(mr)
# Set the allocator for cupy as well.
cp.cuda.set_allocator(rmm_cupy_allocator)

Expand Down
1 change: 1 addition & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def is_readthedocs_build():
"dask": ("https://docs.dask.org/en/stable/", None),
"distributed": ("https://distributed.dask.org/en/stable/", None),
"pyspark": ("https://spark.apache.org/docs/latest/api/python/", None),
"rmm": ("https://docs.rapids.ai/api/rmm/nightly/", None),
}


Expand Down
18 changes: 11 additions & 7 deletions doc/tutorials/external_memory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ the GPU. Following is a snippet from :ref:`sphx_glr_python_examples_external_mem
# It's important to use RMM for GPU-based external memory to improve performance.
# If XGBoost is not built with RMM support, a warning will be raised.
# We use the pool memory resource here, you can also try the `ArenaMemoryResource` for
# improved memory fragmentation handling.
mr = rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource())
rmm.mr.set_current_device_resource(mr)
# Set the allocator for cupy as well.
Expand Down Expand Up @@ -278,13 +280,15 @@ determines the time it takes to run inference, even if a C2C link is available.
Xy_valid = xgboost.ExtMemQuantileDMatrix(it_valid, max_bin=n_bins, ref=Xy_train)
In addition, since the GPU implementation relies on asynchronous memory pool, which is
subject to memory fragmentation even if the ``CudaAsyncMemoryResource`` is used. You might
want to start the training with a fresh pool instead of starting training right after the
ETL process. If you run into out-of-memory errors and you are convinced that the pool is
not full yet (pool memory usage can be profiled with ``nsight-system``), consider tuning
the RMM memory resource like using ``rmm.mr.CudaAsyncMemoryResource`` in conjunction with
``rmm.mr.BinningMemoryResource(mr, 21, 25)`` instead of the
``rmm.mr.PoolMemoryResource(mr)`` shown in the example.
subject to memory fragmentation even if the :py:class:`~rmm.mr.CudaAsyncMemoryResource` is
used. You might want to start the training with a fresh pool instead of starting training
right after the ETL process. If you run into out-of-memory errors and you are convinced
that the pool is not full yet (pool memory usage can be profiled with ``nsight-system``),
consider tuning the RMM memory resource like using
:py:class:`~rmm.mr.CudaAsyncMemoryResource` in conjunction with
:py:class:`BinningMemoryResource(mr, 21, 25) <rmm.mr.BinningMemoryResource>` instead of
the :py:class:`~rmm.mr.PoolMemoryResource`. Alternately, the
:py:class:`~rmm.mr.ArenaMemoryResource` is also an excellent option.

During CPU benchmarking, we used an NVMe connected to a PCIe-4 slot. Other types of
storage can be too slow for practical usage. However, your system will likely perform some
Expand Down
20 changes: 19 additions & 1 deletion python-package/xgboost/testing/data_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from xgboost import testing as tm

from ..core import DataIter, ExtMemQuantileDMatrix, QuantileDMatrix
from ..core import DataIter, DMatrix, ExtMemQuantileDMatrix, QuantileDMatrix


def run_mixed_sparsity(device: str) -> None:
Expand Down Expand Up @@ -78,6 +78,24 @@ def reset(self) -> None:
ExtMemQuantileDMatrix(it, enable_categorical=True)


def check_uneven_sizes(device: str) -> None:
"""Tests for having irregular data shapes."""
batches = [
tm.make_regression(n_samples, 16, use_cupy=device == "cuda")
for n_samples in [512, 256, 1024]
]
unzip = list(zip(*batches))
it = tm.IteratorForTest(unzip[0], unzip[1], None, cache="cache", on_host=True)

Xy = DMatrix(it)
assert Xy.num_col() == 16
assert Xy.num_row() == sum(x.shape[0] for x in unzip[0])

Xy = ExtMemQuantileDMatrix(it)
assert Xy.num_col() == 16
assert Xy.num_row() == sum(x.shape[0] for x in unzip[0])


class CatIter(DataIter): # pylint: disable=too-many-instance-attributes
"""An iterator for testing categorical features."""

Expand Down
5 changes: 3 additions & 2 deletions src/data/ellpack_page_source.cu
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,9 @@ void ExtEllpackPageSourceImpl<F>::Fetch() {
this->GetCuts()};
this->info_->Extend(proxy_->Info(), false, true);
});
// The size of ellpack is logged in write cache.
LOG(INFO) << "Estimated batch size:"
LOG(INFO) << "Generated an Ellpack page with size: "
<< common::HumanMemUnit(this->page_->Impl()->MemCostBytes())
<< " from an batch with estimated size: "
<< cuda_impl::Dispatch<false>(proxy_, [](auto const& adapter) {
return common::HumanMemUnit(adapter->SizeBytes());
});
Expand Down
6 changes: 5 additions & 1 deletion tests/python-gpu/test_gpu_data_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import xgboost as xgb
from xgboost import testing as tm
from xgboost.testing import no_cupy
from xgboost.testing.data_iter import check_invalid_cat_batches
from xgboost.testing.data_iter import check_invalid_cat_batches, check_uneven_sizes
from xgboost.testing.updater import (
check_categorical_missing,
check_categorical_ohe,
Expand Down Expand Up @@ -231,3 +231,7 @@ def test_categorical_ohe(tree_method: str) -> None:
@pytest.mark.skipif(**tm.no_cupy())
def test_invalid_cat_batches() -> None:
check_invalid_cat_batches("cuda")


def test_uneven_sizes() -> None:
check_uneven_sizes("cuda")
7 changes: 6 additions & 1 deletion tests/python/test_data_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from xgboost import testing as tm
from xgboost.data import SingleBatchInternalIter as SingleBatch
from xgboost.testing import IteratorForTest, make_batches, non_increasing
from xgboost.testing.data_iter import check_invalid_cat_batches
from xgboost.testing.data_iter import check_invalid_cat_batches, check_uneven_sizes
from xgboost.testing.updater import (
check_categorical_missing,
check_categorical_ohe,
Expand Down Expand Up @@ -375,3 +375,8 @@ def test_categorical_ohe(tree_method: str) -> None:

def test_invalid_cat_batches() -> None:
check_invalid_cat_batches("cpu")


@pytest.mark.skipif(**tm.no_cupy())
def test_uneven_sizes() -> None:
check_uneven_sizes("cpu")

0 comments on commit ccc5f05

Please sign in to comment.