Skip to content

Commit

Permalink
fix: fix PT AutoBatchSize OOM bug and merge execute_all into base (#4047
Browse files Browse the repository at this point in the history
)

Fix #4036. Fix #4037.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Improved batch processing methods to enhance compatibility with
various array-like objects through Array API integration.
- Added a new test suite to validate the functionality of the
`AutoBatchSize` class under different conditions, ensuring robust
behavior with GPU resources.

- **Bug Fixes**
- Removed the outdated `execute_all` method, streamlining batch
execution processes.

- **Documentation**
- Updated minimum TensorFlow version requirement to 2.7 for backend
compatibility.
- Clarified installation instructions to reflect the updated TensorFlow
version requirement.

- **Chores**
- Specified minimum NumPy version dependency as 1.21 in project
configuration.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
njzjz and pre-commit-ci[bot] authored Aug 6, 2024
1 parent 35ef721 commit cc274c4
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 119 deletions.
100 changes: 0 additions & 100 deletions deepmd/pt/utils/auto_batch_size.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
# SPDX-License-Identifier: LGPL-3.0-or-later
from typing import (
Callable,
Tuple,
Union,
)

import numpy as np
import torch

from deepmd.utils.batch_size import AutoBatchSize as AutoBatchSizeBase
Expand Down Expand Up @@ -64,97 +58,3 @@ def is_oom_error(self, e: Exception) -> bool:
torch.cuda.empty_cache()
return True
return False

def execute_all(
self, callable: Callable, total_size: int, natoms: int, *args, **kwargs
) -> Tuple[Union[np.ndarray, torch.Tensor]]:
"""Excuate a method with all given data.
Parameters
----------
callable : Callable
The method should accept *args and **kwargs as input and return the similiar array.
total_size : int
Total size
natoms : int
The number of atoms
*args
Variable length argument list.
**kwargs
If 2D np.ndarray or torch.Tensor, assume the first axis is batch; otherwise do nothing.
"""

def execute_with_batch_size(
batch_size: int, start_index: int
) -> Tuple[int, Tuple[torch.Tensor]]:
end_index = start_index + batch_size
end_index = min(end_index, total_size)
return (end_index - start_index), callable(
*[
(
vv[start_index:end_index]
if (isinstance(vv, np.ndarray) or isinstance(vv, torch.Tensor))
and vv.ndim > 1
else vv
)
for vv in args
],
**{
kk: (
vv[start_index:end_index]
if (isinstance(vv, np.ndarray) or isinstance(vv, torch.Tensor))
and vv.ndim > 1
else vv
)
for kk, vv in kwargs.items()
},
)

index = 0
results = None
returned_dict = None
while index < total_size:
n_batch, result = self.execute(execute_with_batch_size, index, natoms)
returned_dict = (
isinstance(result, dict) if returned_dict is None else returned_dict
)
if not returned_dict:
result = (result,) if not isinstance(result, tuple) else result
index += n_batch

def append_to_list(res_list, res):
if n_batch:
res_list.append(res)
return res_list

if not returned_dict:
results = [] if results is None else results
results = append_to_list(results, result)
else:
results = (
{kk: [] for kk in result.keys()} if results is None else results
)
results = {
kk: append_to_list(results[kk], result[kk]) for kk in result.keys()
}
assert results is not None
assert returned_dict is not None

def concate_result(r):
if isinstance(r[0], np.ndarray):
ret = np.concatenate(r, axis=0)
elif isinstance(r[0], torch.Tensor):
ret = torch.cat(r, dim=0)
else:
raise RuntimeError(f"Unexpected result type {type(r[0])}")
return ret

if not returned_dict:
r_list = [concate_result(r) for r in zip(*results)]
r = tuple(r_list)
if len(r) == 1:
# avoid returning tuple if callable doesn't return tuple
r = r[0]
else:
r = {kk: concate_result(vv) for kk, vv in results.items()}
return r
63 changes: 47 additions & 16 deletions deepmd/utils/batch_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Tuple,
)

import array_api_compat
import numpy as np

from deepmd.utils.errors import (
Expand Down Expand Up @@ -155,6 +156,8 @@ def execute_all(
) -> Tuple[np.ndarray]:
"""Excuate a method with all given data.
This method is compatible with Array API.
Parameters
----------
callable : Callable
Expand All @@ -177,38 +180,66 @@ def execute_with_batch_size(
return (end_index - start_index), callable(
*[
(
vv[start_index:end_index]
if isinstance(vv, np.ndarray) and vv.ndim > 1
vv[start_index:end_index, ...]
if array_api_compat.is_array_api_obj(vv) and vv.ndim > 1
else vv
)
for vv in args
],
**{
kk: (
vv[start_index:end_index]
if isinstance(vv, np.ndarray) and vv.ndim > 1
vv[start_index:end_index, ...]
if array_api_compat.is_array_api_obj(vv) and vv.ndim > 1
else vv
)
for kk, vv in kwargs.items()
},
)

index = 0
results = []
results = None
returned_dict = None
while index < total_size:
n_batch, result = self.execute(execute_with_batch_size, index, natoms)
if not isinstance(result, tuple):
result = (result,)
if n_batch == 0:
continue
returned_dict = (
isinstance(result, dict) if returned_dict is None else returned_dict
)
if not returned_dict:
result = (result,) if not isinstance(result, tuple) else result
index += n_batch
if n_batch:
for rr in result:
rr.reshape((n_batch, -1))
results.append(result)

r = tuple([np.concatenate(r, axis=0) for r in zip(*results)])
if len(r) == 1:
# avoid returning tuple if callable doesn't return tuple
r = r[0]

def append_to_list(res_list, res):
if n_batch:
res_list.append(res)
return res_list

if not returned_dict:
results = [] if results is None else results
results = append_to_list(results, result)
else:
results = {kk: [] for kk in result} if results is None else results
results = {kk: append_to_list(results[kk], result[kk]) for kk in result}
assert results is not None
assert returned_dict is not None

def concate_result(r):
if array_api_compat.is_array_api_obj(r[0]):
xp = array_api_compat.array_namespace(r[0])
ret = xp.concat(r, axis=0)
else:
raise RuntimeError(f"Unexpected result type {type(r[0])}")
return ret

if not returned_dict:
r_list = [concate_result(r) for r in zip(*results)]
r = tuple(r_list)
if len(r) == 1:
# avoid returning tuple if callable doesn't return tuple
r = r[0]
else:
r = {kk: concate_result(vv) for kk, vv in results.items()}
return r

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion doc/backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ In the documentation, TensorFlow {{ tensorflow_icon }} and PyTorch {{ pytorch_ic
- Model filename extension: `.pb`
- Checkpoint filename extension: `.meta`, `.index`, `.data-00000-of-00001`

[TensorFlow](https://tensorflow.org) 2.2 or above is required.
[TensorFlow](https://tensorflow.org) 2.7 or above is required, since NumPy 1.21 or above is required.
DeePMD-kit does not use the TensorFlow v2 API but uses the TensorFlow v1 API (`tf.compat.v1`) in the graph mode.

### PyTorch {{ pytorch_icon }}
Expand Down
2 changes: 1 addition & 1 deletion doc/install/install-from-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pip install --upgrade pip

:::{tab-item} TensorFlow {{ tensorflow_icon }}

The full instruction to install TensorFlow can be found on the official [TensorFlow website](https://www.tensorflow.org/install/pip). TensorFlow 2.2 or later is supported.
The full instruction to install TensorFlow can be found on the official [TensorFlow website](https://www.tensorflow.org/install/pip). TensorFlow 2.7 or later is supported.

```bash
pip install --upgrade tensorflow
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ classifiers = [
"Environment :: Console",
]
dependencies = [
'numpy',
# array-api-compat requires numpy>=1.21
'numpy>=1.21',
'scipy',
'pyyaml',
'dargs >= 0.4.7',
Expand Down
141 changes: 141 additions & 0 deletions source/tests/common/test_auto_batch_size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# SPDX-License-Identifier: LGPL-3.0-or-later
import os
import sys
import unittest

from deepmd.utils.batch_size import (
AutoBatchSize,
)
from deepmd.utils.errors import (
OutOfMemoryError,
)

if sys.version_info >= (3, 9):
import array_api_strict as xp
else:
raise unittest.SkipTest("array_api_strict doesn't support Python<=3.8")


class CustomizedAutoBatchSizeCPU(AutoBatchSize):
def is_gpu_available(self):
return False

def is_oom_error(self, e):
return isinstance(e, OutOfMemoryError)


class CustomizedAutoBatchSizeGPU(AutoBatchSize):
def is_gpu_available(self):
return True

def is_oom_error(self, e):
return isinstance(e, OutOfMemoryError)


class TestAutoBatchSize(unittest.TestCase):
def oom(self, batch_size, start_index):
if batch_size >= 512:
raise OutOfMemoryError
return batch_size, xp.zeros((batch_size, 2))

def test_execute_oom_gpu(self):
# initial batch size 256 = 128 * 2
auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0)
# no error - 128
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
# no error - 256
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 256)
self.assertEqual(result.shape, (256, 2))
# error - 512 return 0, None
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 0)
self.assertIsNone(result)
# 256 again
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 256)
self.assertEqual(result.shape, (256, 2))
# 256 again
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 256)
self.assertEqual(result.shape, (256, 2))

def test_execute_oom_cpu(self):
# initial batch size 256 = 128 * 2, nb is always 128
auto_batch_size = CustomizedAutoBatchSizeCPU(256, 2.0)
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))

@unittest.mock.patch.dict(os.environ, {"DP_INFER_BATCH_SIZE": "256"}, clear=True)
def test_execute_oom_environment_variables(self):
# DP_INFER_BATCH_SIZE = 256 = 128 * 2, nb is always 128
auto_batch_size = CustomizedAutoBatchSizeGPU(999, 2.0)
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))
nb, result = auto_batch_size.execute(self.oom, 1, 2)
self.assertEqual(nb, 128)
self.assertEqual(result.shape, (128, 2))

def test_execute_all(self):
dd1 = xp.zeros((10000, 2, 1))
auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0)
dd2 = auto_batch_size.execute_all(xp.asarray, 10000, 2, dd1)
assert xp.all(dd1 == dd2)

def test_execute_all_dict(self):
dd0 = xp.zeros((10000, 2, 1, 3, 4))
dd1 = xp.ones((10000, 2, 1, 3, 4))
auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0)

def func(dd1):
return {
"foo": xp.zeros_like(dd1),
"bar": xp.ones_like(dd1),
}

dd2 = auto_batch_size.execute_all(func, 10000, 2, dd1)
assert xp.all(dd0 == dd2["foo"])
assert xp.all(dd1 == dd2["bar"])

def test_execute_all_dict_oom(self):
# to reproduce #4036 when commenting "if n_batch == 0: continue"
dd0 = xp.zeros((10, 2, 1, 3, 4))
dd1 = xp.ones((10, 2, 1, 3, 4))
auto_batch_size = CustomizedAutoBatchSizeGPU(4, 2.0)

def func(dd1):
if dd1.shape[0] >= 2:
raise OutOfMemoryError
return {
"foo": xp.zeros_like(dd1),
"bar": xp.ones_like(dd1),
}

dd2 = auto_batch_size.execute_all(func, 10, 2, dd1)
assert xp.all(dd0 == dd2["foo"])
assert xp.all(dd1 == dd2["bar"])

0 comments on commit cc274c4

Please sign in to comment.