Skip to content

Commit

Permalink
update ndarray to nested list, test=develop
Browse files Browse the repository at this point in the history
  • Loading branch information
sandyhouse committed Aug 10, 2021
1 parent 3a2666e commit 773516b
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 71 deletions.
128 changes: 74 additions & 54 deletions python/paddle/distributed/auto_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np

import functools
import operator
import paddle.fluid.core as core
import paddle
from paddle.fluid.framework import Variable
Expand Down Expand Up @@ -48,21 +48,41 @@ def _static_mode_check():
"please use paddle.enable_static().")


def _get_nested_list_shape(nested_list):
"""
Get the shape of a nested_list.
"""
result = []
while isinstance(nested_list, list):
result.append(len(nested_list))
nested_list = nested_list[0]
return result


def _flatten_nested_list(nested_list):
"""
Get a list of all items in a nested_list.
Ref: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-a-list-of-lists
"""
result = functools.reduce(operator.iconcat, nested_list, [])
return result


class ProcessMesh(object):
r"""
The class `Processmesh` describes the topology of logical processes.
A mesh is an N-dimensional array. The shape of the N-dimensional
array represents the topology of logical processes and every
element of the N-dimensional array represent a logical process. For
example, the 2-dimensional array numpy.array([[2, 4, 5], [0, 1, 3]])
example, the 2-dimensional array [[2, 4, 5], [0, 1, 3]]
illustrates six logical processes organized as the topology [2, 3],
i.e., the shape of the 2-dimensional array. With the above topology,
there are two parallel groups, where the first parallel group has a
parallel degree of 2 and the second one has a parallel degree of 3.
And the first logical process is the one with id=2.
Args:
mesh (numpy.ndarray): an N-dimensional array describes the toplogy
mesh (list): an N-dimensional array (nested list) describes the toplogy
of logical processes. The shape of the N-dimensional array
represents the topology of logical processes and every
element of the N-dimensional array represents a logical process.
Expand All @@ -74,18 +94,17 @@ class ProcessMesh(object):
None
Raises:
ValueError: If `mesh` is not an instance of numpy.ndarray.
ValueError: If `mesh` is not an instance of list.
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.distributed as dist
paddle.enable_static()
mesh = dist.ProcessMesh(np.array([[2, 4, 5], [0, 1, 3]]))
mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]])
assert mesh.parent is None
assert mesh.topology == [2, 3]
assert mesh.process_group == [2, 4, 5, 0, 1, 3]
Expand All @@ -95,20 +114,23 @@ class ProcessMesh(object):

def __init__(self, mesh, parent=None):
_static_mode_check()
if mesh is None or not isinstance(mesh, np.ndarray):
raise ValueError('mesh must be an instance of numpy.ndarray.')
if mesh is None or not isinstance(mesh, list):
raise ValueError('mesh must be an instance of list.')

self._topology = _get_nested_list_shape(mesh)
self._processes = _flatten_nested_list(mesh)

# Every element of mesh must be >= 0.
assert np.min(mesh) >= 0, ('All elements of mesh must be >= 0.')
assert min(self._processes) >= 0, ('All elements of mesh must be >= 0.')

unique_ids = set(mesh.flatten().tolist())
assert len(unique_ids) == mesh.size, (
unique_ids = set(self._processes)
assert len(unique_ids) == len(self._processes), (
'All elements of mesh must be unique.')

if parent is None:
# For root ProcessMesh, the ids of logical processes must be range
# from 0 to N-1, where N is the number of logical processes.
assert np.max(mesh) == mesh.size - 1, (
assert max(self._processes) == len(self._processes) - 1, (
'For root ProcessMesh, ids of logical processes must be range '
'from 0 to N-1, where N is the number of logical processes.')

Expand All @@ -128,8 +150,6 @@ def __init__(self, mesh, parent=None):
assert unique_ids <= parent_ids, (
'All elements in mesh must belong to its parent.')

self._topology = list(mesh.shape)
self._processes = mesh.flatten().tolist()
self._desc = core.ProcessMeshDesc(self._topology, self._processes,
parent_id)

Expand Down Expand Up @@ -178,13 +198,12 @@ def set_placement(self, order):
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.distributed as dist
paddle.enable_static()
mesh = dist.ProcessMesh(np.array([[2, 4, 5], [0, 1, 3]]))
mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]])
mesh.set_placement([0, 1, 2, 3, 4, 5])
"""
Expand Down Expand Up @@ -220,26 +239,26 @@ def __ne__(self, other):
return not self.__eq__(other)


def _dims_mapping_checker(tensor, mesh, dims_mapping):
assert len(tensor.shape) == len(dims_mapping)
def _dim_mapping_checker(tensor, mesh, dim_mapping):
assert len(tensor.shape) == len(dim_mapping)
mesh_dim = len(mesh.topology)
dim_set = set()
for i in range(len(dims_mapping)):
assert dims_mapping[i] == -1 or (dims_mapping[i] < mesh_dim and
dims_mapping[i] >= 0)
if dims_mapping[i] >= 0:
assert dims_mapping[i] not in dim_set
dim_set.add(dims_mapping[i])
for i in range(len(dim_mapping)):
assert dim_mapping[i] == -1 or (dim_mapping[i] < mesh_dim and
dim_mapping[i] >= 0)
if dim_mapping[i] >= 0:
assert dim_mapping[i] not in dim_set
dim_set.add(dim_mapping[i])


def shard_tensor(x, mesh, dims_mapping):
def shard_tensor(x, mesh, dim_mapping):
"""
Add distributed attributes for a tensors.
Args:
x (Tensor): the tensor to process.
mesh (ProcessMesh): an instance of ProcessMesh to describe the topology of logical processes.
dims_mapping (list): a list to describe the mapping between `x` and `mesh`,
dim_mapping (list): a list to describe the mapping between `x` and `mesh`,
the dimension `i` of `x` is split across the dimension `dims_mapping[i]`, where -1 means
without parition along the corresponding dimension.
Expand All @@ -249,23 +268,22 @@ def shard_tensor(x, mesh, dims_mapping):
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.distributed as dist
paddle.enable_static()
mesh = dist.ProcessMesh(np.array([[2, 4, 5], [0, 1, 3]]))
mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]])
x = paddle.ones([4, 6])
dist.shard_tensor(x, mesh, [0, -1])
"""
_static_mode_check()
_dims_mapping_checker(x, mesh, dims_mapping)
_dim_mapping_checker(x, mesh, dim_mapping)
attr_name = _append_attr_suffix('mesh_id')
x._set_attr(attr_name, mesh._id)
attr_name = _append_attr_suffix('dims_mapping')
x._set_attr(attr_name, dims_mapping)
attr_name = _append_attr_suffix('dim_mapping')
x._set_attr(attr_name, dim_mapping)
return x


Expand All @@ -275,13 +293,13 @@ def set_shard_mask(x, mask):
Args:
x (Tensor): the tensor to process.
mask (numpy.ndarray): the shape of `mask` must be the same as the ProcessMesh belonging to
mask (list): a nested list. The shape of `mask` must be the same as the ProcessMesh belonging to
the tensor `x`. Every value of `mask` must be one or zero, where one means
the tenor `x` will be put on the corresponding logical process and zero means the tensor `x`
will not be put on the corresponding logical process.
For example, for a ProcessMesh represented by the 2-dimensional
array numpy.array([[2, 4, 5], [0, 1, 3]]), and a `mask` given by the
2-dimensional array numpy.array([[1, 0, 1], [0, 1, 0]]),
array [[2, 4, 5], [0, 1, 3]], and a `mask` given by the
2-dimensional [[1, 0, 1], [0, 1, 0]],
then the tensor `x` will only be put on logical processes 2, 5 and 1.
Returns:
Expand All @@ -290,35 +308,34 @@ def set_shard_mask(x, mask):
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.distributed as dist
paddle.enable_static()
mesh = dist.ProcessMesh(np.array([[2, 4, 5], [0, 1, 3]]))
mask = np.array([[1, 0, 1], [0, 1, 0]])
mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]])
mask = [[1, 0, 1], [0, 1, 0]]
x = paddle.ones([4, 6])
dist.set_shard_mask(x, mask)
"""
_static_mode_check()
assert isinstance(mask, np.ndarray)
assert isinstance(mask, list)
attr_name = _append_attr_suffix('mask')
x._set_attr(attr_name, mask.flatten().tolist())
x._set_attr(attr_name, _flatten_nested_list(mask))
return x


def shard_op(op_fn, mesh, dims_mapping_dict, **kwargs):
def shard_op(op_fn, mesh, dim_mapping_dict, **kwargs):
"""
Call a functioin and add distributed attributes for ops added by the function.
Args:
op_fn (callable): a callable object of an API.
mesh (ProcessMesh): an instance of ProcessMesh specifies the topology of logical processes.
dims_mapping_dict (dict): a mapping from tensor's name to its dims_mapping.
The dims_mapping is a list to describe the mapping between a tensor and `mesh`,
the dimension `i` of the tensor is split across the dimension `dims_mapping[i]`,
dim_mapping_dict (dict): a mapping from tensor's name to its dims_mapping.
The dim_mapping is a list to describe the mapping between a tensor and `mesh`,
the dimension `i` of the tensor is split across the dimension `dim_mapping[i]`,
where -1 means without parition along the corresponding dimension.
kwargs (dict): a dict of parameter passed to the function `op_fn`.
Expand All @@ -328,13 +345,12 @@ def shard_op(op_fn, mesh, dims_mapping_dict, **kwargs):
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.distributed as dist
paddle.enable_static()
mesh = dist.ProcessMesh(np.array([[2, 4, 5], [0, 1, 3]]))
mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]])
x = paddle.ones([4, 6])
y = paddle.zeros([4, 6])
kwargs = {'x': x, 'y': y}
Expand All @@ -347,15 +363,21 @@ def shard_op(op_fn, mesh, dims_mapping_dict, **kwargs):
op_size = len(main_block.ops)
output = op_fn(**kwargs)
new_op_size = len(main_block.ops)
if dims_mapping_dict is None: dims_mapping_dict = dict()
if dim_mapping_dict is None: dim_mapping_dict = dict()
for idx in range(op_size, new_op_size):
op = main_block.ops[idx]
attr_name = _append_attr_suffix('mesh_id')
op._set_attr(attr_name, mesh._id)
for var_name in op.input_arg_names + op.output_arg_names:
if var_name in dims_mapping_dict:
attr_name = _append_attr_suffix(var_name)
op._set_attr(attr_name, dims_mapping_dict[var_name])
for var_name in dim_mapping_dict.keys():
assert var_name in op.output_arg_names + op.input_arg_names
attr_name = _append_attr_suffix(var_name)
if var_name in op.input_arg_names:
# we use the prefix "IN_" to indicates an input argument name
attr_name = "IN_" + attr_name
else:
# we use the prefix "OUT_" to indicates an input argument name
attr_name = "OUT_" + attr_name
op._set_attr(attr_name, dim_mapping_dict[var_name])

if isinstance(output, Variable):
output = [output]
Expand All @@ -368,15 +390,14 @@ def set_offload_device(x, device):
Args:
x (tensor): the tensor to process.
device (str): the device that the tensor `x` will be put on, e.g., 'gpu:0', 'cpu'.
device (str): the device that the tensor `x` will be put on, e.g., 'cpu'.
Returns:
Tensor: the tensor `x` itself.
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.distributed as dist
Expand Down Expand Up @@ -405,7 +426,6 @@ def set_pipeline_stage(stage):
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.distributed as dist
Expand Down
Loading

0 comments on commit 773516b

Please sign in to comment.