From cc274c45930026f6eff1f452456a912257822b4d Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Tue, 6 Aug 2024 04:38:17 -0400 Subject: [PATCH] fix: fix PT AutoBatchSize OOM bug and merge execute_all into base (#4047) Fix #4036. Fix #4037. ## Summary by CodeRabbit - **New Features** - Improved batch processing methods to enhance compatibility with various array-like objects through Array API integration. - Added a new test suite to validate the functionality of the `AutoBatchSize` class under different conditions, ensuring robust behavior with GPU resources. - **Bug Fixes** - Removed the outdated `execute_all` method, streamlining batch execution processes. - **Documentation** - Updated minimum TensorFlow version requirement to 2.7 for backend compatibility. - Clarified installation instructions to reflect the updated TensorFlow version requirement. - **Chores** - Specified minimum NumPy version dependency as 1.21 in project configuration. --------- Signed-off-by: Jinzhe Zeng Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- deepmd/pt/utils/auto_batch_size.py | 100 -------------- deepmd/utils/batch_size.py | 63 ++++++--- doc/backend.md | 2 +- doc/install/install-from-source.md | 2 +- pyproject.toml | 3 +- source/tests/common/test_auto_batch_size.py | 141 ++++++++++++++++++++ 6 files changed, 192 insertions(+), 119 deletions(-) create mode 100644 source/tests/common/test_auto_batch_size.py diff --git a/deepmd/pt/utils/auto_batch_size.py b/deepmd/pt/utils/auto_batch_size.py index 6dfb80067a..3d6054296f 100644 --- a/deepmd/pt/utils/auto_batch_size.py +++ b/deepmd/pt/utils/auto_batch_size.py @@ -1,11 +1,5 @@ # SPDX-License-Identifier: LGPL-3.0-or-later -from typing import ( - Callable, - Tuple, - Union, -) -import numpy as np import torch from deepmd.utils.batch_size import AutoBatchSize as AutoBatchSizeBase @@ -64,97 +58,3 @@ def is_oom_error(self, e: Exception) -> bool: torch.cuda.empty_cache() return True return False - - def execute_all( - self, callable: Callable, total_size: int, natoms: int, *args, **kwargs - ) -> Tuple[Union[np.ndarray, torch.Tensor]]: - """Excuate a method with all given data. - - Parameters - ---------- - callable : Callable - The method should accept *args and **kwargs as input and return the similiar array. - total_size : int - Total size - natoms : int - The number of atoms - *args - Variable length argument list. - **kwargs - If 2D np.ndarray or torch.Tensor, assume the first axis is batch; otherwise do nothing. - """ - - def execute_with_batch_size( - batch_size: int, start_index: int - ) -> Tuple[int, Tuple[torch.Tensor]]: - end_index = start_index + batch_size - end_index = min(end_index, total_size) - return (end_index - start_index), callable( - *[ - ( - vv[start_index:end_index] - if (isinstance(vv, np.ndarray) or isinstance(vv, torch.Tensor)) - and vv.ndim > 1 - else vv - ) - for vv in args - ], - **{ - kk: ( - vv[start_index:end_index] - if (isinstance(vv, np.ndarray) or isinstance(vv, torch.Tensor)) - and vv.ndim > 1 - else vv - ) - for kk, vv in kwargs.items() - }, - ) - - index = 0 - results = None - returned_dict = None - while index < total_size: - n_batch, result = self.execute(execute_with_batch_size, index, natoms) - returned_dict = ( - isinstance(result, dict) if returned_dict is None else returned_dict - ) - if not returned_dict: - result = (result,) if not isinstance(result, tuple) else result - index += n_batch - - def append_to_list(res_list, res): - if n_batch: - res_list.append(res) - return res_list - - if not returned_dict: - results = [] if results is None else results - results = append_to_list(results, result) - else: - results = ( - {kk: [] for kk in result.keys()} if results is None else results - ) - results = { - kk: append_to_list(results[kk], result[kk]) for kk in result.keys() - } - assert results is not None - assert returned_dict is not None - - def concate_result(r): - if isinstance(r[0], np.ndarray): - ret = np.concatenate(r, axis=0) - elif isinstance(r[0], torch.Tensor): - ret = torch.cat(r, dim=0) - else: - raise RuntimeError(f"Unexpected result type {type(r[0])}") - return ret - - if not returned_dict: - r_list = [concate_result(r) for r in zip(*results)] - r = tuple(r_list) - if len(r) == 1: - # avoid returning tuple if callable doesn't return tuple - r = r[0] - else: - r = {kk: concate_result(vv) for kk, vv in results.items()} - return r diff --git a/deepmd/utils/batch_size.py b/deepmd/utils/batch_size.py index 30971c7256..8fe67ad6fc 100644 --- a/deepmd/utils/batch_size.py +++ b/deepmd/utils/batch_size.py @@ -10,6 +10,7 @@ Tuple, ) +import array_api_compat import numpy as np from deepmd.utils.errors import ( @@ -155,6 +156,8 @@ def execute_all( ) -> Tuple[np.ndarray]: """Excuate a method with all given data. + This method is compatible with Array API. + Parameters ---------- callable : Callable @@ -177,16 +180,16 @@ def execute_with_batch_size( return (end_index - start_index), callable( *[ ( - vv[start_index:end_index] - if isinstance(vv, np.ndarray) and vv.ndim > 1 + vv[start_index:end_index, ...] + if array_api_compat.is_array_api_obj(vv) and vv.ndim > 1 else vv ) for vv in args ], **{ kk: ( - vv[start_index:end_index] - if isinstance(vv, np.ndarray) and vv.ndim > 1 + vv[start_index:end_index, ...] + if array_api_compat.is_array_api_obj(vv) and vv.ndim > 1 else vv ) for kk, vv in kwargs.items() @@ -194,21 +197,49 @@ def execute_with_batch_size( ) index = 0 - results = [] + results = None + returned_dict = None while index < total_size: n_batch, result = self.execute(execute_with_batch_size, index, natoms) - if not isinstance(result, tuple): - result = (result,) + if n_batch == 0: + continue + returned_dict = ( + isinstance(result, dict) if returned_dict is None else returned_dict + ) + if not returned_dict: + result = (result,) if not isinstance(result, tuple) else result index += n_batch - if n_batch: - for rr in result: - rr.reshape((n_batch, -1)) - results.append(result) - - r = tuple([np.concatenate(r, axis=0) for r in zip(*results)]) - if len(r) == 1: - # avoid returning tuple if callable doesn't return tuple - r = r[0] + + def append_to_list(res_list, res): + if n_batch: + res_list.append(res) + return res_list + + if not returned_dict: + results = [] if results is None else results + results = append_to_list(results, result) + else: + results = {kk: [] for kk in result} if results is None else results + results = {kk: append_to_list(results[kk], result[kk]) for kk in result} + assert results is not None + assert returned_dict is not None + + def concate_result(r): + if array_api_compat.is_array_api_obj(r[0]): + xp = array_api_compat.array_namespace(r[0]) + ret = xp.concat(r, axis=0) + else: + raise RuntimeError(f"Unexpected result type {type(r[0])}") + return ret + + if not returned_dict: + r_list = [concate_result(r) for r in zip(*results)] + r = tuple(r_list) + if len(r) == 1: + # avoid returning tuple if callable doesn't return tuple + r = r[0] + else: + r = {kk: concate_result(vv) for kk, vv in results.items()} return r @abstractmethod diff --git a/doc/backend.md b/doc/backend.md index 8639396941..f6eaf0e45b 100644 --- a/doc/backend.md +++ b/doc/backend.md @@ -12,7 +12,7 @@ In the documentation, TensorFlow {{ tensorflow_icon }} and PyTorch {{ pytorch_ic - Model filename extension: `.pb` - Checkpoint filename extension: `.meta`, `.index`, `.data-00000-of-00001` -[TensorFlow](https://tensorflow.org) 2.2 or above is required. +[TensorFlow](https://tensorflow.org) 2.7 or above is required, since NumPy 1.21 or above is required. DeePMD-kit does not use the TensorFlow v2 API but uses the TensorFlow v1 API (`tf.compat.v1`) in the graph mode. ### PyTorch {{ pytorch_icon }} diff --git a/doc/install/install-from-source.md b/doc/install/install-from-source.md index 49542c0465..c0b78004d0 100644 --- a/doc/install/install-from-source.md +++ b/doc/install/install-from-source.md @@ -40,7 +40,7 @@ pip install --upgrade pip :::{tab-item} TensorFlow {{ tensorflow_icon }} -The full instruction to install TensorFlow can be found on the official [TensorFlow website](https://www.tensorflow.org/install/pip). TensorFlow 2.2 or later is supported. +The full instruction to install TensorFlow can be found on the official [TensorFlow website](https://www.tensorflow.org/install/pip). TensorFlow 2.7 or later is supported. ```bash pip install --upgrade tensorflow diff --git a/pyproject.toml b/pyproject.toml index 45752a365d..9807bc6545 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ classifiers = [ "Environment :: Console", ] dependencies = [ - 'numpy', + # array-api-compat requires numpy>=1.21 + 'numpy>=1.21', 'scipy', 'pyyaml', 'dargs >= 0.4.7', diff --git a/source/tests/common/test_auto_batch_size.py b/source/tests/common/test_auto_batch_size.py new file mode 100644 index 0000000000..0369bbb70c --- /dev/null +++ b/source/tests/common/test_auto_batch_size.py @@ -0,0 +1,141 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import os +import sys +import unittest + +from deepmd.utils.batch_size import ( + AutoBatchSize, +) +from deepmd.utils.errors import ( + OutOfMemoryError, +) + +if sys.version_info >= (3, 9): + import array_api_strict as xp +else: + raise unittest.SkipTest("array_api_strict doesn't support Python<=3.8") + + +class CustomizedAutoBatchSizeCPU(AutoBatchSize): + def is_gpu_available(self): + return False + + def is_oom_error(self, e): + return isinstance(e, OutOfMemoryError) + + +class CustomizedAutoBatchSizeGPU(AutoBatchSize): + def is_gpu_available(self): + return True + + def is_oom_error(self, e): + return isinstance(e, OutOfMemoryError) + + +class TestAutoBatchSize(unittest.TestCase): + def oom(self, batch_size, start_index): + if batch_size >= 512: + raise OutOfMemoryError + return batch_size, xp.zeros((batch_size, 2)) + + def test_execute_oom_gpu(self): + # initial batch size 256 = 128 * 2 + auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0) + # no error - 128 + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + # no error - 256 + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + # error - 512 return 0, None + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 0) + self.assertIsNone(result) + # 256 again + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + # 256 again + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + + def test_execute_oom_cpu(self): + # initial batch size 256 = 128 * 2, nb is always 128 + auto_batch_size = CustomizedAutoBatchSizeCPU(256, 2.0) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + + @unittest.mock.patch.dict(os.environ, {"DP_INFER_BATCH_SIZE": "256"}, clear=True) + def test_execute_oom_environment_variables(self): + # DP_INFER_BATCH_SIZE = 256 = 128 * 2, nb is always 128 + auto_batch_size = CustomizedAutoBatchSizeGPU(999, 2.0) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + + def test_execute_all(self): + dd1 = xp.zeros((10000, 2, 1)) + auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0) + dd2 = auto_batch_size.execute_all(xp.asarray, 10000, 2, dd1) + assert xp.all(dd1 == dd2) + + def test_execute_all_dict(self): + dd0 = xp.zeros((10000, 2, 1, 3, 4)) + dd1 = xp.ones((10000, 2, 1, 3, 4)) + auto_batch_size = CustomizedAutoBatchSizeGPU(256, 2.0) + + def func(dd1): + return { + "foo": xp.zeros_like(dd1), + "bar": xp.ones_like(dd1), + } + + dd2 = auto_batch_size.execute_all(func, 10000, 2, dd1) + assert xp.all(dd0 == dd2["foo"]) + assert xp.all(dd1 == dd2["bar"]) + + def test_execute_all_dict_oom(self): + # to reproduce #4036 when commenting "if n_batch == 0: continue" + dd0 = xp.zeros((10, 2, 1, 3, 4)) + dd1 = xp.ones((10, 2, 1, 3, 4)) + auto_batch_size = CustomizedAutoBatchSizeGPU(4, 2.0) + + def func(dd1): + if dd1.shape[0] >= 2: + raise OutOfMemoryError + return { + "foo": xp.zeros_like(dd1), + "bar": xp.ones_like(dd1), + } + + dd2 = auto_batch_size.execute_all(func, 10, 2, dd1) + assert xp.all(dd0 == dd2["foo"]) + assert xp.all(dd1 == dd2["bar"])