Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary exports in distributed.communication and move wait & barrier #48396

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
from paddle.distributed.fleet.dataset import QueueDataset # noqa: F401
from paddle.distributed.fleet.base.topology import ParallelMode # noqa: F401

from .collective import barrier # noqa: F401
from .collective import split # noqa: F401
from .collective import new_group # noqa: F401
from .collective import wait # noqa: F401

from .communication import (
stream,
Expand All @@ -53,6 +51,8 @@
is_initialized,
destroy_process_group,
get_group,
wait,
barrier,
) # noqa: F401

from .auto_parallel import shard_op # noqa: F401
Expand Down
129 changes: 1 addition & 128 deletions python/paddle/distributed/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
# limitations under the License.

import datetime
from ..fluid.layer_helper import LayerHelper
from ..fluid.framework import in_dygraph_mode
from ..fluid.framework import _non_static_mode
from ..fluid.layers.tensor import fill_constant
import paddle
import paddle.fluid.core as core
from paddle import _legacy_C_ops
from .fleet.layers.mpu.mp_ops import split # noqa: F401
from .fleet.layers.mpu.mp_ops import _c_identity # noqa: F401
from .fleet.layers.mpu.mp_ops import _c_concat # noqa: F401
Expand Down Expand Up @@ -160,60 +157,6 @@ def _new_process_group_impl(
return pg


def barrier(group=None):
"""

Barrier among all participators in the group.

Args:
group (Group): The group instance return by new_group or None for global default group.

Returns:
None.

Examples:
.. code-block:: python

import paddle
from paddle.distributed import init_parallel_env

paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
init_parallel_env()
paddle.distributed.barrier()
"""
if group is not None and not group.is_member():
return

if in_dygraph_mode():
group = _get_default_group() if group is None else group
place = paddle.fluid.framework._current_expected_place()
if isinstance(place, paddle.fluid.core.CPUPlace):
task = group.process_group.barrier()
else:
device_id = place.get_device_id()
task = group.process_group.barrier(device_id)
task.wait()
return

ring_id = 0 if group is None else group.id

temp = fill_constant([1], dtype="int32", value="1")
if _non_static_mode():
return _legacy_C_ops.barrier(temp, temp, 'ring_id', ring_id)

op_type = 'barrier'

if not isinstance(ring_id, int):
raise ValueError("The type of 'group' for barrier must be int.")
helper = LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [temp]},
outputs={'Out': [temp]},
attrs={'ring_id': ring_id},
)


# _custom_gid provides a way for users to
# set the group id, which is usually useful
# to be compatible with the static mode.
Expand Down Expand Up @@ -356,78 +299,8 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
tmp = (
paddle.to_tensor([1], dtype="int32")
if _non_static_mode()
else fill_constant([0], dtype="int32", value="1")
else paddle.full([0], 1, dtype="int32")
)
paddle.distributed.all_reduce(tmp, sync_op=True)
paddle.distributed.wait(tmp)
return gp


def wait(tensor, group=None, use_calc_stream=True):
"""

wait to sync stream for group.

Args:
tensor (Tensor): The Tensor used before sync.
group (Group): The Group instance to perform sync.
use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False).
Default to True.

Returns:
None.

Examples:
.. code-block:: python

import paddle

paddle.distributed.init_parallel_env()
tindata = paddle.randn(shape=[2, 3])
paddle.distributed.all_reduce(tindata, sync_op=True)
paddle.distributed.wait(tindata)

"""

if group is not None and not group.is_member():
return

ring_id = 0 if group is None else group.id

if use_calc_stream:
_sync_calc_stream(tensor)
else:
_sync_comm_stream(tensor, ring_id)


def _sync_calc_stream(tensor):

if _non_static_mode():
return _legacy_C_ops.c_sync_calc_stream(tensor, tensor)

op_type = 'c_sync_calc_stream'

helper = LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
)


def _sync_comm_stream(tensor, ring_id=0):

if _non_static_mode():
return _legacy_C_ops.c_sync_comm_stream(
[tensor], [tensor], 'ring_id', ring_id
)

op_type = 'c_sync_comm_stream'

helper = LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
attrs={'ring_id': ring_id},
)
30 changes: 7 additions & 23 deletions python/paddle/distributed/communication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,10 @@
from .batch_isend_irecv import batch_isend_irecv, P2POp
from .reduce_scatter import reduce_scatter
from .all_to_all import alltoall, alltoall_single
from .group import is_initialized, destroy_process_group, get_group

__all__ = [
"ReduceOp",
"all_gather",
"all_gather_object",
"all_reduce",
"alltoall",
"alltoall_single",
"broadcast",
"reduce",
"send",
"scatter",
"isend",
"recv",
"irecv",
"batch_isend_irecv",
"P2POp",
"reduce_scatter",
"is_initialized",
"destroy_process_group",
"get_group",
]
from .group import (
is_initialized,
destroy_process_group,
get_group,
wait,
barrier,
)
123 changes: 123 additions & 0 deletions python/paddle/distributed/communication/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
# limitations under the License.

import warnings
import paddle
import paddle.distributed as dist
import paddle.fluid.core as core
import paddle.fluid.framework as framework
import paddle.fluid.layer_helper as layer_helper


class Group:
Expand Down Expand Up @@ -227,3 +231,122 @@ def get_group(id=0):
return _GroupManager.group_map_by_id[id]
warnings.warn("Group {} is not initialized.".format(id))
return None


def _sync_calc_stream(tensor):
if framework._non_static_mode():
return paddle._legacy_C_ops.c_sync_calc_stream(tensor, tensor)

op_type = 'c_sync_calc_stream'
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
)


def _sync_comm_stream(tensor, ring_id=0):
if framework._non_static_mode():
return paddle._legacy_C_ops.c_sync_comm_stream(
[tensor], [tensor], 'ring_id', ring_id
)

op_type = 'c_sync_comm_stream'
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
attrs={'ring_id': ring_id},
)


def wait(tensor, group=None, use_calc_stream=True):
"""

wait to sync stream for group.

Args:
tensor (Tensor): The Tensor used before sync.
group (Group): The Group instance to perform sync.
use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False).
Default to True.

Returns:
None.

Examples:
.. code-block:: python

import paddle

paddle.distributed.init_parallel_env()
tindata = paddle.randn(shape=[2, 3])
paddle.distributed.all_reduce(tindata, sync_op=True)
paddle.distributed.wait(tindata)

"""
if group is not None and not group.is_member():
return

if use_calc_stream:
_sync_calc_stream(tensor)
else:
ring_id = 0 if group is None else group.id
_sync_comm_stream(tensor, ring_id)


def barrier(group=None):
"""

Barrier among all participators in the group.

Args:
group (Group): The group instance return by new_group or None for global default group.

Returns:
None.

Examples:
.. code-block:: python

import paddle
from paddle.distributed import init_parallel_env

paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
init_parallel_env()
paddle.distributed.barrier()
"""
if group is not None and not group.is_member():
return

if framework.in_dygraph_mode():
group = _get_global_group() if group is None else group
place = framework._current_expected_place()
if isinstance(place, core.CPUPlace):
task = group.process_group.barrier()
else:
device_id = place.get_device_id()
task = group.process_group.barrier(device_id)
task.wait()
return

ring_id = 0 if group is None else group.id

barrier_tensor = paddle.full([1], 1, dtype="int32")
if framework._non_static_mode():
return paddle._legacy_C_ops.barrier(
barrier_tensor, barrier_tensor, 'ring_id', ring_id
)

op_type = 'barrier'
if not isinstance(ring_id, int):
raise ValueError("The type of 'group' for barrier must be int.")
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [barrier_tensor]},
outputs={'Out': [barrier_tensor]},
attrs={'ring_id': ring_id},
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from paddle.distributed.collective import (
_get_global_group,
new_group,
wait,
)

from ...utils.internal_storage import ParamStorage, GradStorage
Expand Down Expand Up @@ -174,7 +173,7 @@ def _sync_params_and_buffers(self):
)

# Multi stream operation will be supported later
wait(tensor=p, group=self.group, use_calc_stream=True)
dist.wait(tensor=p, group=self.group, use_calc_stream=True)

def _generate_master_params(self, trainable_params):
if self.offload:
Expand Down Expand Up @@ -464,7 +463,7 @@ def _broadcast_params(self):
)

# Multi stream operation will be supported later
wait(
dist.wait(
tensor=internal_storage.buffer,
group=self.group,
use_calc_stream=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def __sync_buffers(self):
buffer, self._global_root_rank, self._group, sync_op=True
)
# Multi stream operation will be supported later
collective.wait(tensor=buffer, group=self._group, use_calc_stream=True)
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True)

def __getattr__(self, name):
"""Forward missing attributes to wrapped layer."""
Expand Down Expand Up @@ -382,7 +382,7 @@ def cleanup():
)

# Multi stream operation will be supported later
collective.wait(
dist.wait(
tensor=param.grad,
group=self._group,
use_calc_stream=True,
Expand Down Expand Up @@ -448,7 +448,7 @@ def cleanup():
)

# Multi stream operation will be supported later
collective.wait(
dist.wait(
tensor=grad_storage.buffer,
group=self._group,
use_calc_stream=True,
Expand Down
Loading