From 6fdd8ff81d292d79bb236ca166bd0ebda14d2357 Mon Sep 17 00:00:00 2001 From: LiYuRio Date: Thu, 22 Sep 2022 10:15:08 +0800 Subject: [PATCH] move all_reduce --- .../collective/ProcessGroupGloo.cc | 8 ++ .../distributed/collective/ProcessGroupGloo.h | 6 + python/paddle/distributed/collective.py | 119 +----------------- .../distributed/communication/all_reduce.py | 87 +++++++++++++ .../paddle/distributed/communication/group.py | 26 +++- .../{comm_utils.py => reduce.py} | 28 ++++- .../communication/stream/all_reduce.py | 50 ++++++-- .../distributed/fleet/layers/mpu/mp_ops.py | 2 +- python/paddle/distributed/parallel.py | 2 + 9 files changed, 197 insertions(+), 131 deletions(-) create mode 100644 python/paddle/distributed/communication/all_reduce.py rename python/paddle/distributed/communication/{comm_utils.py => reduce.py} (59%) diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index b23942b114f3be..097c9799b70f23 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -293,6 +293,14 @@ std::shared_ptr ProcessGroupGloo::AllReduce( std::vector& inputs, std::vector& outputs, const AllreduceOptions& opts) { + return AllReduce(inputs, outputs, opts, true); +} + +std::shared_ptr ProcessGroupGloo::AllReduce( + std::vector& inputs, + std::vector& outputs, + const AllreduceOptions& opts, + bool sync_op) { auto tag = next_tag(); std::shared_ptr task; auto context = get_context(); diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index 95ce18c1d8217e..d911da91eb1a32 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -120,6 +120,12 @@ class ProcessGroupGloo : public ProcessGroup { std::vector& outputs, const AllreduceOptions& opts = AllreduceOptions()) override; + std::shared_ptr AllReduce( + std::vector& inputs, + std::vector& outputs, + const AllreduceOptions& opts, + bool sync_op) override; + std::shared_ptr Barrier( const BarrierOptions& = BarrierOptions()) override; diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index e79d736421f68c..41cb3256c8f5d5 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -52,8 +52,9 @@ from .fleet.layers.mpu.mp_ops import _linear from .fleet.layers.mpu.mp_ops import _parallel_linear from .fleet.layers.mpu.mp_ops import _parallel_embedding -from .communication.comm_utils import ReduceOp -from .communication.group import Group +from .communication.group import Group, _add_new_group +from .communication.all_reduce import all_reduce +from .communication.reduce import _get_reduce_op, ReduceOp __all__ = [] @@ -153,19 +154,6 @@ def _new_ring_id(): return len(_get_group_map()) + max(_get_global_env().nrings, 9) -def _get_reduce_op(reduce_op, func_name): - if reduce_op == ReduceOp.SUM: - return core.ReduceOp.SUM - elif reduce_op == ReduceOp.MAX: - return core.ReduceOp.MAX - elif reduce_op == ReduceOp.MIN: - return core.ReduceOp.MIN - elif reduce_op == ReduceOp.PROD: - return core.ReduceOp.PRODUCT - else: - raise ValueError("Unknown reduce_op type for {}.".format(func_name)) - - def get_group(id=0): """ @@ -411,6 +399,9 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): _group_map_by_name[group_name] = group _group_map[gid] = group _group_map_backend[group] = backend + #TODO: The method below is a new method for group management, will replace the previous + # three in the future. + _add_new_group(group) # TODO(shenliang03): This is a temporary solution to solve the problem of # hang caused by tcp @@ -704,104 +695,6 @@ def broadcast(tensor, src, group=None, sync_op=True): }) -def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): - """ - - Reduce a tensor over all ranks so that all get the result. - As shown below, one process is started with a GPU and the data of this process is represented - by its group rank. The reduce operator is sum. Through all_reduce operator, - each GPU will have the sum of the data from all GPUs. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png - :width: 800 - :alt: all_reduce - :align: center - - Args: - tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type - should be float16, float32, float64, int32, int64, int8, uint8 or bool. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. - group (Group, optional): The group instance return by new_group or None for global default group. - sync_op (bool, optional): Wether this op is a sync op. Default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) - else: - data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) - dist.all_reduce(data) - print(data) - # [[5, 7, 9], [5, 7, 9]] (2 GPUs) - """ - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - op_type = _get_reduce_op(op, "all_reduce") - group = _get_default_group() if group is None else group - task = group.process_group.allreduce(tensor, op_type) - if sync_op: - task.wait() - return None - else: - return task - - use_calc_stream = sync_op - ring_id = 0 if group is None else group.id - if _non_static_mode(): - if op == ReduceOp.SUM: - return _legacy_C_ops.c_allreduce_sum_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - elif op == ReduceOp.MAX: - return _legacy_C_ops.c_allreduce_max_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - elif op == ReduceOp.MIN: - return _legacy_C_ops.c_allreduce_min_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - elif op == ReduceOp.PROD: - return _legacy_C_ops.c_allreduce_prod_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - else: - raise ValueError("Unknown parameter: {}.".format(op)) - - check_variable_and_dtype(tensor, 'tensor', [ - 'float16', 'float32', 'float64', 'int32', 'int64', 'int8', 'uint8', - 'bool' - ], 'all_reduce') - if op == ReduceOp.SUM: - op_type = 'c_allreduce_sum' - elif op == ReduceOp.MAX: - op_type = 'c_allreduce_max' - elif op == ReduceOp.MIN: - op_type = 'c_allreduce_min' - elif op == ReduceOp.PROD: - op_type = 'c_allreduce_prod' - if not isinstance(ring_id, int): - raise ValueError("The type of 'ring_id' for all_reduce should be int.") - helper = LayerHelper(op_type, **locals()) - helper.append_op(type=op_type, - inputs={'X': [tensor]}, - outputs={'Out': [tensor]}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': use_calc_stream - }) - - def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True): """ diff --git a/python/paddle/distributed/communication/all_reduce.py b/python/paddle/distributed/communication/all_reduce.py new file mode 100644 index 00000000000000..737e0cbbfb56c0 --- /dev/null +++ b/python/paddle/distributed/communication/all_reduce.py @@ -0,0 +1,87 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +from paddle.distributed.communication import stream as stream +from paddle.distributed.communication.reduce import ReduceOp + + +def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): + """ + + Reduce a tensor over all ranks so that all get the result. + As shown below, one process is started with a GPU and the data of this process is represented + by its group rank. The reduce operator is sum. Through all_reduce operator, + each GPU will have the sum of the data from all GPUs. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png + :width: 800 + :alt: all_reduce + :align: center + + Args: + tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type + should be float16, float32, float64, int32, int64, int8, uint8 or bool. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Wether this op is a sync op. Default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) + else: + data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) + dist.all_reduce(data) + print(data) + # [[5, 7, 9], [5, 7, 9]] (2 GPUs) + """ + if not framework._in_legacy_dygraph(): + return stream.all_reduce(tensor, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False) + + # code below will be removed after we remove the old dygraph + use_calc_stream = sync_op + ring_id = 0 if group is None else group.id + if op == ReduceOp.SUM: + return paddle._legacy_C_ops.c_allreduce_sum_(tensor, 'use_calc_stream', + use_calc_stream, 'ring_id', + ring_id) + elif op == ReduceOp.MAX: + return paddle._legacy_C_ops.c_allreduce_max_(tensor, 'use_calc_stream', + use_calc_stream, 'ring_id', + ring_id) + elif op == ReduceOp.MIN: + return paddle._legacy_C_ops.c_allreduce_min_(tensor, 'use_calc_stream', + use_calc_stream, 'ring_id', + ring_id) + elif op == ReduceOp.PROD: + return paddle._legacy_C_ops.c_allreduce_prod_(tensor, 'use_calc_stream', + use_calc_stream, + 'ring_id', ring_id) + else: + raise ValueError("Unknown parameter: {}.".format(op)) diff --git a/python/paddle/distributed/communication/group.py b/python/paddle/distributed/communication/group.py index e9094ba5145546..6b4e545b245d1e 100644 --- a/python/paddle/distributed/communication/group.py +++ b/python/paddle/distributed/communication/group.py @@ -18,9 +18,9 @@ class Group(): The abstract representation of group. """ - def __init__(self, group_rank, id, ranks, pg=None, name=None): - self._group_rank = group_rank - self._world_size = len(ranks) if group_rank >= 0 else -1 + def __init__(self, rank_in_group, id, ranks, pg=None, name=None): + self._rank_in_group = rank_in_group + self._world_size = len(ranks) if rank_in_group >= 0 else -1 self._id = id self._ranks = ranks self._pg = pg @@ -28,7 +28,7 @@ def __init__(self, group_rank, id, ranks, pg=None, name=None): @property def rank(self): - return self._group_rank + return self._rank_in_group @property def ranks(self): @@ -74,3 +74,21 @@ def __repr__(self): debug_str += "; name: " debug_str += self.name if self.name else "None" return debug_str + + +class _GroupManager(): + global_group_id = 0 + group_map_by_id = {} + + +def _get_global_group(): + if _GroupManager.global_group_id not in _GroupManager.group_map_by_id: + raise RuntimeError("The global group is not initialized.") + return _GroupManager.group_map_by_id[_GroupManager.global_group_id] + + +def _add_new_group(group): + if group.id in _GroupManager.group_map_by_id: + raise RuntimeError("The group with id {} already exist.".format( + group.id)) + _GroupManager.group_map_by_id[group.id] = group diff --git a/python/paddle/distributed/communication/comm_utils.py b/python/paddle/distributed/communication/reduce.py similarity index 59% rename from python/paddle/distributed/communication/comm_utils.py rename to python/paddle/distributed/communication/reduce.py index 62e1bcb4cca94d..5caa5bebedfd81 100644 --- a/python/paddle/distributed/communication/comm_utils.py +++ b/python/paddle/distributed/communication/reduce.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import paddle.fluid.framework as framework +import paddle.fluid.core as core + class ReduceOp: """ @@ -48,3 +51,26 @@ class ReduceOp: MIN = 2 PROD = 3 AVG = 4 + + +def _get_reduce_op(reduce_op, func_name): + if framework.in_dygraph_mode(): + if reduce_op == ReduceOp.SUM: + return core.ReduceOp.SUM + elif reduce_op == ReduceOp.MAX: + return core.ReduceOp.MAX + elif reduce_op == ReduceOp.MIN: + return core.ReduceOp.MIN + elif reduce_op == ReduceOp.PROD: + return core.ReduceOp.PRODUCT + else: + if reduce_op == ReduceOp.SUM: + return 'c_allreduce_sum' + elif reduce_op == ReduceOp.MAX: + return 'c_allreduce_max' + elif reduce_op == ReduceOp.MIN: + return 'c_allreduce_min' + elif reduce_op == ReduceOp.PROD: + return 'c_allreduce_prod' + + raise ValueError("Unknown reduce_op type for {}.".format(func_name)) diff --git a/python/paddle/distributed/communication/stream/all_reduce.py b/python/paddle/distributed/communication/stream/all_reduce.py index e4cfa6d3218c23..965a6ae89008a3 100644 --- a/python/paddle/distributed/communication/stream/all_reduce.py +++ b/python/paddle/distributed/communication/stream/all_reduce.py @@ -13,12 +13,16 @@ # limitations under the License. import paddle.fluid.framework as framework -from paddle.distributed import collective +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp +from paddle.distributed.communication.group import _get_global_group def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream): - op_type = collective._get_reduce_op(op, "all_reduce") - group = collective._get_default_group() if group is None else group + op_type = _get_reduce_op(op, "all_reduce") + + group = _get_global_group() if group is None else group if use_calc_stream: return group.process_group.allreduce_on_calc_stream(tensor, op_type) @@ -29,8 +33,34 @@ def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream): return task +def _all_reduce_in_static_mode(tensor, op, group, sync_op, use_calc_stream): + data_feeder.check_variable_and_dtype(tensor, 'tensor', [ + 'float16', 'float32', 'float64', 'int32', 'int64', 'int8', 'uint8', + 'bool' + ], 'all_reduce') + + op_type = _get_reduce_op(op, "all_reduce") + ring_id = 0 if group is None else group.id + + if not isinstance(ring_id, int): + raise ValueError("The type of 'ring_id' for all_reduce should be int.") + + # TODO: Support task and use task.wait in static mode + # Use use_calc_stream rather than sync_op + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op(type=op_type, + inputs={'X': [tensor]}, + outputs={'Out': [tensor]}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': sync_op + }) + + return None + + def all_reduce(tensor, - op=collective.ReduceOp.SUM, + op=ReduceOp.SUM, group=None, sync_op=True, use_calc_stream=False): @@ -41,7 +71,7 @@ def all_reduce(tensor, Args: tensor (Tensor): The input tensor on each rank. The result will overwrite this tenor after communication. Support float16, float32, float64, int32 or int64 as the input data type. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. group (Group, optional): Communicate in which group. If none is given, use the global group as default. sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This @@ -50,9 +80,6 @@ def all_reduce(tensor, Returns: Return a task object. - Warning: - This API only supports the dygraph mode now. - Examples: .. code-block:: python @@ -84,7 +111,6 @@ def all_reduce(tensor, if framework.in_dygraph_mode(): return _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream) - - raise RuntimeError( - "paddle.distributed.stream.all_reduce is only supported in dygraph mode now." - ) + else: + return _all_reduce_in_static_mode(tensor, op, group, sync_op, + use_calc_stream) diff --git a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py index dc4dc05c7ba41a..a2f3bde6cfc648 100644 --- a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py +++ b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py @@ -22,7 +22,7 @@ from paddle.fluid.data_feeder import check_variable_and_dtype from paddle.fluid.dygraph import layers from paddle.distributed import collective -from ....communication.comm_utils import ReduceOp +from ....communication.reduce import ReduceOp from paddle.fluid.data_feeder import check_dtype import paddle.fluid.dygraph_utils as dygraph_utils diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 29825e81b774a1..19a24488f94429 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -43,6 +43,7 @@ from paddle.distributed.collective import _new_process_group_impl from paddle.distributed.collective import Group from paddle.distributed.collective import _set_group_map_backend +from paddle.distributed.communication.group import _add_new_group __all__ = [] @@ -262,6 +263,7 @@ def train(): _set_group_map_by_name(_default_group_name, group) _set_group_map(0, group) _set_group_map_backend(group, backend) + _add_new_group(group) parallel_helper._set_parallel_ctx(True) paddle.distributed.barrier(group=group)