diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 99a6eb6b67472..38831192c8c2b 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -141,6 +141,7 @@ message PipelineConfig { message TensorParallelConfig { optional int32 tensor_parallel_degree = 1 [ default = 1 ]; + optional int32 tensor_init_seed = 2 [ default = -1 ]; } message DistributedStrategy { diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 85b8cafd6c315..55f86959c59f2 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -98,6 +98,13 @@ def get_group_rank(self, rank): else: return -1 + def __repr__(self): + debug_str = "rank: {}, nranks: {}, id: {}, ranks: ".format( + self.rank, self.nranks, self.id) + debug_str += ", ".join(map(str, self.ranks)) + debug_str += ". " + return debug_str + _global_env = None diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 25e571dba0c80..640bc00cb6c57 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -923,6 +923,8 @@ def tensor_parallel_configs(self): **Notes**: **Detailed arguments for tensor_parallel_configs** **tensor_parallel_degree**: degree of tensor parallel + **tensor_init_seed**: parameter initialization random seed + Examples: @@ -931,7 +933,8 @@ def tensor_parallel_configs(self): import paddle.distributed.fleet as fleet strategy = fleet.DistributedStrategy() strategy.tensor_parallel = True - strategy.tensor_parallel_configs = {"tensor_parallel_degree": 4} + strategy.tensor_parallel_configs = {"tensor_parallel_degree": 4, + "tensor_init_seed": 123} """ return get_msg_dict(self.strategy.tensor_parallel_configs) diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index a7564a23a7cfb..edc4a22dc37e9 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -17,6 +17,7 @@ import warnings import paddle import os +import numpy as np from paddle.fluid.framework import dygraph_only from paddle.fluid import compiler from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase @@ -28,7 +29,7 @@ from paddle.fluid.dygraph import parallel_helper from . import topology as tp from .topology import ParallelMode -from ..meta_parallel import ModelParallel +from ..meta_parallel import TensorParallel, model_parallel_random_seed from ..meta_parallel import PipelineParallel from ..meta_optimizers import HybridParallelOptimizer from ..meta_optimizers import HybridParallelGradScaler @@ -279,6 +280,14 @@ def _init_hybrid_parallel_env(self): self._hcg = tp.HybridCommunicateGroup(self._topology) + if self.mp_degree > 1: + tensor_parallel_configs = self._user_defined_strategy.tensor_parallel_configs + tensor_init_seed = tensor_parallel_configs["tensor_init_seed"] + if tensor_init_seed == -1: + model_parallel_random_seed() + else: + model_parallel_random_seed(tensor_init_seed) + def get_hybrid_communicate_group(self): assert self._hcg is not None return self._hcg @@ -780,8 +789,8 @@ def forward(self, x): last_comm_group_size_MB, find_unused_parameters=self._user_defined_strategy. find_unused_parameters) - elif self._hcg.get_parallel_mode() == ParallelMode.MODEL_PARALLEL: - distributed_model = ModelParallel( + elif self._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL: + distributed_model = TensorParallel( model, self._hcg, strategy=self._user_defined_strategy) elif self._hcg.get_parallel_mode() == ParallelMode.PIPELINE_PARALLEL: distributed_model = PipelineParallel( diff --git a/python/paddle/distributed/fleet/base/topology.py b/python/paddle/distributed/fleet/base/topology.py index 470a4d83aac3f..04525977192be 100644 --- a/python/paddle/distributed/fleet/base/topology.py +++ b/python/paddle/distributed/fleet/base/topology.py @@ -28,7 +28,7 @@ class ParallelMode(object): DATA_PARALLEL = 0 - MODEL_PARALLEL = 1 + TENSOR_PARALLEL = 1 PIPELINE_PARALLEL = 2 @@ -155,12 +155,12 @@ def __init__(self, topology): _HYBRID_PARALLEL_GROUP = self def get_parallel_mode(self): - # there are three modes : DataParallel / ModelParallel / PipelineParallel + # there are three modes : DataParallel / TensorParallel / PipelineParallel if self._mp_degree == 1 and self._pp_degree == 1: return ParallelMode.DATA_PARALLEL elif self._mp_degree > 1 and self._pp_degree == 1: # initialize the seed - return ParallelMode.MODEL_PARALLEL + return ParallelMode.TENSOR_PARALLEL elif self._pp_degree > 1: return ParallelMode.PIPELINE_PARALLEL diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py index d0e8034f5cae1..c0f671e7e446b 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py @@ -31,7 +31,7 @@ def __init__(self, scaler, hcg): self._scaler = scaler self._hcg = hcg self._is_mp = ( - self._hcg.get_parallel_mode() == ParallelMode.MODEL_PARALLEL) + self._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL) def scale(self, var): return self._scaler.scale(var) diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py index b7ac298d2223e..00ac019c0d188 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py @@ -90,12 +90,12 @@ def __init__(self, optimizer, hcg, strategy): self._strategy = strategy self._hcg = hcg self._is_mp = ( - self._hcg.get_parallel_mode() == ParallelMode.MODEL_PARALLEL) + self._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL) self._need_dp = (self._hcg.get_data_parallel_world_size() > 1) if isinstance(self._inner_opt._grad_clip, ClipGradByGlobalNorm) and self._is_mp: - logger.warning("using ClipGradByGlobalNorm in ModelParallel, the origin " \ + logger.warning("using ClipGradByGlobalNorm in TensorParallel, the origin " \ "optmizer'grad clip will be changed.") self._inner_opt._grad_clip = HybridParallelClipGrad( self._inner_opt._grad_clip, hcg) diff --git a/python/paddle/distributed/fleet/meta_parallel/__init__.py b/python/paddle/distributed/fleet/meta_parallel/__init__.py index ed74d8e744e50..894771a3d5005 100644 --- a/python/paddle/distributed/fleet/meta_parallel/__init__.py +++ b/python/paddle/distributed/fleet/meta_parallel/__init__.py @@ -20,7 +20,7 @@ from .parallel_layers import RNGStatesTracker # noqa: F401 from .parallel_layers import model_parallel_random_seed # noqa: F401 from .parallel_layers import get_rng_state_tracker # noqa: F401 -from .model_parallel import ModelParallel # noqa: F401 +from .tensor_parallel import TensorParallel # noqa: F401 from .pipeline_parallel import PipelineParallel # noqa: F401 __all__ = [] diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/mp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/mp_layers.py index af59b16e22aa8..730a7430133e0 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/mp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/mp_layers.py @@ -41,6 +41,7 @@ def __init__(self, self.rank = tp._HYBRID_PARALLEL_GROUP.get_model_parallel_rank() self.origin_num_embeddings = num_embeddings + self.is_mp = (self.world_size > 1) per_part_size = ( num_embeddings + self.world_size - 1) // self.world_size @@ -50,16 +51,36 @@ def __init__(self, per_part_size += 1 # make the last row as the padding index self.per_part_size = per_part_size - self.embedding = paddle.nn.Embedding( - per_part_size, - embedding_dim, - padding_idx=per_part_size - 1, - sparse=False, - weight_attr=weight_attr, - name=name) - self.embedding.weight.is_distributed = True + self._dtype = self._helper.get_default_dtype() + self._size = [per_part_size, embedding_dim] + self._weight_attr = weight_attr + self._name = name + + if self.is_mp: + with get_rng_state_tracker().rng_state(): + self.weight = self.create_parameter( + attr=self._weight_attr, + shape=self._size, + dtype=self._dtype, + is_bias=False) + self.weight[per_part_size - 1] = 0.0 + self.weight.is_distributed = True + else: + self.weight = self.create_parameter( + attr=self._weight_attr, + shape=[num_embeddings, embedding_dim], + dtype=self._dtype, + is_bias=False) def forward(self, x): + if not self.is_mp: + return F.embedding( + x, + weight=self.weight, + padding_idx=None, + sparse=False, + name=self._name) + origin_input_shape = x.shape if len(origin_input_shape) == 2: x = paddle.unsqueeze(x, axis=-1) @@ -72,13 +93,18 @@ def forward(self, x): if len(origin_input_shape) == 2: x_shard = paddle.squeeze(x_shard, axis=-1) - emb_out = self.embedding(x_shard) - if self.world_size > 1: - emb_out = paddle.distributed.collective._mp_allreduce( - emb_out, - group=self.model_parallel_group, - use_calc_stream=True, - use_model_parallel=True) + emb_out = F.embedding( + x_shard, + weight=self.weight, + padding_idx=self.per_part_size - 1, + sparse=False, + name=self._name) + + emb_out = paddle.distributed.collective._mp_allreduce( + emb_out, + group=self.model_parallel_group, + use_calc_stream=True, + use_model_parallel=True) return emb_out @@ -96,8 +122,9 @@ def __init__(self, ) self.world_size = tp._HYBRID_PARALLEL_GROUP.get_model_parallel_world_size( ) + self._name = name + self.is_mp = (self.world_size > 1) - self.name = name self.gather_output = gather_output assert out_features % self.world_size == 0, ( "Number of column of the weight for linear ({}) must be" @@ -108,10 +135,20 @@ def __init__(self, self._weight_attr = weight_attr self._dtype = self._helper.get_default_dtype() - self.weight = self.create_parameter( - shape=[in_features, self.output_size_per_partition], - attr=self._weight_attr, - dtype=self._dtype) + if self.is_mp: + with get_rng_state_tracker().rng_state(): + self.weight = self.create_parameter( + shape=[in_features, self.output_size_per_partition], + attr=self._weight_attr, + dtype=self._dtype, + is_bias=False) + else: + self.weight = self.create_parameter( + shape=[in_features, self.output_size_per_partition], + attr=self._weight_attr, + dtype=self._dtype, + is_bias=False) + self.weight.is_distributed = True if has_bias: @@ -119,18 +156,24 @@ def __init__(self, self.bias = self.create_parameter( shape=[self.output_size_per_partition], attr=paddle.nn.initializer.Constant(value=0.0), - dtype=self._dtype) + dtype=self._dtype, + is_bias=True) self.bias.is_distributed = True else: self.bias = None def forward(self, x): # use inner api to process identity - input_parallel = paddle.distributed.collective._c_identity( - x, group=self.model_parallel_group) + if self.is_mp: + input_parallel = paddle.distributed.collective._c_identity( + x, group=self.model_parallel_group) + else: + input_parallel = x + output_parallel = F.linear( - input_parallel, self.weight, self.bias, name=self.name) - if self.gather_output: + input_parallel, self.weight, self.bias, name=self._name) + + if self.gather_output and self.is_mp: output = paddle.distributed.collective._c_concat( output_parallel, nranks=self.world_size, @@ -155,7 +198,7 @@ def __init__(self, self.input_is_parallel = input_is_parallel self._weight_attr = weight_attr self._dtype = self._helper.get_default_dtype() - self.name = name + self._name = name self.model_parallel_group = tp._HYBRID_PARALLEL_GROUP.get_model_parallel_group( ) @@ -163,6 +206,7 @@ def __init__(self, ) self.rank = tp._HYBRID_PARALLEL_GROUP.get_model_parallel_rank() + self.is_mp = (self.world_size > 1) assert in_features % self.world_size == 0, ( "Number of row of the weight for linear ({}) must be" " divisible by model parallel size ({})".format(in_features, @@ -170,22 +214,33 @@ def __init__(self, self.input_size_per_partition = in_features // self.world_size - self.weight = self.create_parameter( - shape=[self.input_size_per_partition, self.out_features], - attr=self._weight_attr, - dtype=self._dtype) + if self.is_mp: + with get_rng_state_tracker().rng_state(): + self.weight = self.create_parameter( + shape=[self.input_size_per_partition, self.out_features], + attr=self._weight_attr, + dtype=self._dtype, + is_bias=False) + else: + self.weight = self.create_parameter( + shape=[self.input_size_per_partition, self.out_features], + attr=self._weight_attr, + dtype=self._dtype, + is_bias=False) + self.weight.is_distributed = True if has_bias: self.bias = self.create_parameter( shape=[self.out_features], attr=paddle.nn.initializer.Constant(value=0.0), - dtype=self._dtype) + dtype=self._dtype, + is_bias=True) else: self.bias = None def forward(self, x): - if self.input_is_parallel: + if self.input_is_parallel or (not self.is_mp): input_parallel = x else: # split last dim @@ -195,12 +250,16 @@ def forward(self, x): nranks=self.world_size, group=self.model_parallel_group) - output_parallel = F.linear(input_parallel, self.weight, name=self.name) - output_ = paddle.distributed.collective._mp_allreduce( - output_parallel, - group=self.model_parallel_group, - use_calc_stream=True, - use_model_parallel=True) + output_parallel = F.linear(input_parallel, self.weight, name=self._name) + + if self.is_mp: + output_ = paddle.distributed.collective._mp_allreduce( + output_parallel, + group=self.model_parallel_group, + use_calc_stream=True, + use_model_parallel=True) + else: + output_ = output_parallel output = output_ + self.bias if self.bias is not None else output_ return output diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py index 41c9deabd1e11..70daa3b25365e 100644 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/random.py @@ -14,6 +14,7 @@ import paddle import contextlib +import numpy as np __all__ = [] @@ -65,14 +66,18 @@ def get_rng_state_tracker(): return RNG_STATE_TRACKER -def model_parallel_random_seed(seed=2048): +def model_parallel_random_seed(seed=None): import paddle.distributed.fleet as fleet hcg = fleet.get_hybrid_communicate_group() rank = hcg.get_model_parallel_rank() - local_seed = seed + 1024 + rank - global_seed = seed + if seed: + global_seed = seed + local_seed = seed * 1024 + rank * 100 + else: + global_seed = np.random.randint(0, 655350) + local_seed = np.random.randint(rank * 10000, (rank + 1) * 10000 - 1) RNG_STATE_TRACKER.reset() - paddle.seed(global_seed) RNG_STATE_TRACKER.add(MODEL_PARALLEL_RNG, local_seed) + paddle.seed(global_seed) diff --git a/python/paddle/distributed/fleet/meta_parallel/model_parallel.py b/python/paddle/distributed/fleet/meta_parallel/tensor_parallel.py similarity index 89% rename from python/paddle/distributed/fleet/meta_parallel/model_parallel.py rename to python/paddle/distributed/fleet/meta_parallel/tensor_parallel.py index 682d7152a42bd..1dbf668d6e13a 100644 --- a/python/paddle/distributed/fleet/meta_parallel/model_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/tensor_parallel.py @@ -22,15 +22,15 @@ __all__ = [] -class ModelParallel(MetaParallelBase): +class TensorParallel(MetaParallelBase): def __init__(self, layers, hcg, **kwargs): - super(ModelParallel, self).__init__(layers, hcg, **kwargs) + super(TensorParallel, self).__init__(layers, hcg, **kwargs) def _prepare_for_model(self): logger.info("start broadcast mp parameters") broadcast_mp_parameters(self._layers, self._hcg) - logger.info("start broadcast mp parameters") + logger.info("start broadcast dp parameters") broadcast_dp_parameters(self._layers, self._hcg) logger.info("mp's parameters is ready") diff --git a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py index 5521bd5b95283..ddbd6111b4609 100644 --- a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py +++ b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py @@ -44,7 +44,15 @@ def _apply_collective_grads(parameters, comm_group): for coalesced_grad, _, _ in coalesced_grads_and_vars: # need to div nranks - coalesced_grad = coalesced_grad / comm_group.nranks + div_factor = paddle.to_tensor( + comm_group.nranks, dtype=coalesced_grad.dtype) + paddle.fluid.framework._dygraph_tracer().trace_op( + type="elementwise_div", + inputs={'X': coalesced_grad, + 'Y': div_factor}, + outputs={'Out': coalesced_grad}, + attrs={'axis': -1}) + paddle.distributed.all_reduce(coalesced_grad, group=comm_group) _split_tensors(coalesced_grads_and_vars) diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py index dfbef998a2f07..349d5f82dbf54 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_layers.py @@ -231,7 +231,7 @@ def test_parallel_embedding(self): # model_b check_group = dist.new_group(list(range(self.model_parallel_size))) integral_w = [] - partial_w = model_a.embedding.embedding.weight.clone().detach() + partial_w = model_a.embedding.weight.clone().detach() paddle.distributed.all_gather(integral_w, partial_w, group=check_group) result_w = [] for idx in range(len(integral_w)): diff --git a/python/paddle/fluid/tests/unittests/new_group.py b/python/paddle/fluid/tests/unittests/new_group.py index fb7beeee1df2e..c9c4acc3220c7 100644 --- a/python/paddle/fluid/tests/unittests/new_group.py +++ b/python/paddle/fluid/tests/unittests/new_group.py @@ -27,6 +27,7 @@ def __init__(self): def test_all(self): gp = paddle.distributed.new_group([0, 1]) + print("gp info:", gp) print("test new group api ok") tmp = np.array([0, 0, 0])