Skip to content

Commit

Permalink
[AutoParallel] Fix pipeline parallel get none grad in non-computatio …
Browse files Browse the repository at this point in the history
…rank. (PaddlePaddle#60214)

* [AutoParallel] Fix pipeline parallel get none grad in non-computation rank.

* fix optimizer update parameter is uninitialized

* fix gradient clip

---------

Co-authored-by: LiYuRio <liyuruijx@163.com>
  • Loading branch information
2 people authored and Wanglongzhi2001 committed Jan 7, 2024
1 parent 5810d48 commit d7ef595
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 105 deletions.
3 changes: 2 additions & 1 deletion paddle/fluid/eager/accumulation/accumulation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ GradNodeAccumulation::operator()(

if (!weak_grad_.expired() && !is_new_grad) {
auto grad = weak_grad_.lock();
if (grad_out.defined() && grad_out.initialized()) {
if (grad_out.defined() &&
(grad_out.is_dist_tensor() || grad_out.initialized())) {
CopyOrAddTensor(grad.get(), grad_out, is_fake_empty_);
}
// else { do nothing since there is no valid value in grad out tensor }
Expand Down
7 changes: 7 additions & 0 deletions paddle/fluid/eager/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,13 @@ void EagerUtils::FillZeroForEmptyGradInput(paddle::Tensor* in_grad,
*(static_cast<phi::distributed::DistTensor*>(in_grad->impl().get())
->unsafe_mutable_value()) =
*(static_cast<phi::DenseTensor*>(tensor_with_zero.impl().get()));
} else {
*(static_cast<phi::distributed::DistTensor*>(in_grad->impl().get())
->unsafe_mutable_value()) =
phi::DenseTensor(
std::make_shared<phi::Allocation>(
nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}
} else {
auto tensor_with_zero =
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/pybind/eager_method.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2900,6 +2900,10 @@ static PyObject* tensor__grad_ivar(TensorObject* self,
if (meta && meta->Grad().initialized()) {
return ToPyObject(meta->Grad());
} else {
if (meta && !meta->Grad().initialized() && meta->Grad().impl() &&
meta->Grad().is_dist_tensor()) {
return ToPyObject(meta->Grad(), false);
}
RETURN_PY_NONE
}
EAGER_CATCH_AND_THROW_RETURN_NULL
Expand Down
5 changes: 4 additions & 1 deletion paddle/fluid/pybind/eager_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,13 @@ PyObject* tensor_properties_get_grad(TensorObject* self, void* closure) {
EAGER_TRY
VLOG(6) << "Get grad for tensor: " << self->tensor.name();
auto meta = egr::EagerUtils::nullable_autograd_meta(self->tensor);
VLOG(6) << meta << " initialized: " << meta->Grad().initialized();
if (meta && meta->Grad().initialized()) {
return ToPyObject(meta->Grad());
} else {
if (meta && !meta->Grad().initialized() && meta->Grad().impl() &&
meta->Grad().is_dist_tensor()) {
return ToPyObject(meta->Grad(), false);
}
RETURN_PY_NONE
}
EAGER_CATCH_AND_THROW_RETURN_NULL
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/pybind/eager_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2475,7 +2475,8 @@ paddle::Tensor PyTensorHook::operator()(const paddle::Tensor& var) {

PyObject* res = nullptr;
try {
PyObject* p_tmp_var = ToPyObject(var);
bool return_py_none_if_not_initialize = var.is_dist_tensor() ? false : true;
PyObject* p_tmp_var = ToPyObject(var, return_py_none_if_not_initialize);
res = PyObject_CallFunctionObjArgs(py_func_, p_tmp_var, nullptr);
Py_DECREF(p_tmp_var);
} catch (platform::EnforceNotMet& e) {
Expand Down
4 changes: 2 additions & 2 deletions paddle/phi/api/yaml/generator/dist_bw_api_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
MULTI_SINGLE_OUT_CREATION_TEMPLATE_NO_SPMD = """
auto dist_out_{idx} = SetKernelDistOutput({name});
auto dense_out_{idx} = dist_out_{idx} ? dist_out_{idx}->unsafe_mutable_value() : nullptr;
if (dense_out_{idx} && !rank_is_in_current_mesh && dist_out_{idx}->defined()) {{
if (dense_out_{idx} && !rank_is_in_current_mesh && !dist_out_{idx}->defined()) {{
*dense_out_{idx} = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
Expand All @@ -137,7 +137,7 @@
CreateKernelDistOutput({name}, !rank_is_in_current_mesh, spmd_info.second[{idx}]);
phi::distributed::DistTensor* dist_out_{idx} = shared_dist_out_{idx}.get();
phi::DenseTensor* dense_out_{idx} = dist_out_{idx} ? dist_out_{idx}->unsafe_mutable_value() : nullptr;
if (dense_out_{idx} && !rank_is_in_current_mesh && dist_out_{idx}->defined()) {{
if (dense_out_{idx} && !rank_is_in_current_mesh && !dist_out_{idx}->defined()) {{
*dense_out_{idx} = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
Expand Down
1 change: 1 addition & 0 deletions paddle/phi/infermeta/unary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4023,6 +4023,7 @@ void SequenceMaskScalarInferMeta(const MetaTensor& x,

void SquaredL2NormInferMeta(const MetaTensor& x, MetaTensor* out) {
out->set_dims({1});
out->set_dtype(x.dtype());
}

void SqueezeInferMeta(const MetaTensor& x,
Expand Down
50 changes: 0 additions & 50 deletions python/paddle/distributed/auto_parallel/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,7 @@ def __init__(self, optimizer, shard_fn=None):
optimizer.helper = paddle.base.layer_helper.LayerHelper(
optimizer.__class__.__name__
)
# solve global_clip for auto_parallel
self._shard_clip = False
self._generate_flag = False
if (
hasattr(optimizer, "_grad_clip")
and optimizer._grad_clip is not None
Expand Down Expand Up @@ -564,40 +562,15 @@ def _shard_accumulator(self, param):
placements=placements,
)

def generate_pp_mesh(self, all_process_ids=[]):
pp_mesh = None
if len(all_process_ids) <= 1:
return pp_mesh
else:
mesh = np.array(all_process_ids)
for i in range(mesh.shape[-1]):
ranks = mesh[:, i].tolist()
if dist.get_rank() in ranks:
pp_mesh = dist.ProcessMesh(ranks)
return pp_mesh

def step(self):
if not isinstance(self._inner_opt._parameter_list[0], dict):
params_grads = []
all_process_ids = []
for param in self._inner_opt._parameter_list:
if param.stop_gradient:
continue
if param._grad_ivar() is not None:
grad_var = param._grad_ivar()
params_grads.append((param, grad_var))
if (
not self._generate_flag
and self._shard_clip
and param.is_dist()
):
if param.process_mesh.process_ids not in all_process_ids:
all_process_ids.append(param.process_mesh.process_ids)
if not self._generate_flag and self._shard_clip:
self._inner_opt._grad_clip._pp_mesh = self.generate_pp_mesh(
all_process_ids
)
self._generate_flag = True
for p, g in params_grads:
self._shard_accumulator(p)
self._inner_opt._apply_optimize(
Expand All @@ -606,36 +579,13 @@ def step(self):
else:
for param_group in self._inner_opt._param_groups:
params_grads = defaultdict(lambda: [])
all_process_ids = []
shard_clip_flag = False
for param in param_group['params']:
if param.stop_gradient:
continue
if param._grad_ivar() is not None:
grad_var = param._grad_ivar()
params_grads['params'].append((param, grad_var))
if (
not self._generate_flag
and "grad_clip" in param_group.keys()
and isinstance(
param_group["grad_clip"],
paddle.nn.ClipGradByGlobalNorm,
)
and param.is_dist()
):
if (
param.process_mesh.process_ids
not in all_process_ids
):
all_process_ids.append(
param.process_mesh.process_ids
)
shard_clip_flag = True

if shard_clip_flag:
param_group["grad_clip"]._pp_mesh = self.generate_pp_mesh(
all_process_ids
)
params_grads.update(
{k: v for k, v in param_group.items() if k != 'params'}
)
Expand Down
64 changes: 17 additions & 47 deletions python/paddle/nn/clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import paddle
import paddle.autograd as imperative_base
import paddle.distributed as dist
from paddle import _C_ops
from paddle.base import core, framework, unique_name
from paddle.base.data_feeder import check_variable_and_dtype
Expand Down Expand Up @@ -661,8 +662,6 @@ def __init__(
# are so many hard code depends on `add_n` in the legacy static
# manual hybrid-parallel.
self._async_add_n = None
# just for auto parallel.
self._pp_mesh = None

def __str__(self):
return "Gradient Clip By GlobalNorm, global_norm=%f" % (self.clip_norm)
Expand All @@ -673,6 +672,8 @@ def _dygraph_clip(self, params_grads):
sum_square_list = []
sum_square_list_fp16 = []
sum_square_list_fp32 = []
src_mesh = params_grads[0][0].process_mesh

for p, g in params_grads:
if g is None:
continue
Expand All @@ -689,6 +690,14 @@ def _dygraph_clip(self, params_grads):
merge_grad = get_tensor_from_selected_rows(merge_grad)

sum_square = _squared_l2_norm(merge_grad)

# if the gradient mesh is not equal to src mesh
# do reshard to get the result of squared_l2 from other pp stage mesh
if src_mesh is not None and g.process_mesh != src_mesh:
sum_square = dist.reshard(
sum_square, src_mesh, sum_square.placements
)

if (
sum_square.dtype == core.VarDesc.VarType.FP16
or sum_square.dtype == core.VarDesc.VarType.BF16
Expand All @@ -715,64 +724,21 @@ def async_add_n(var_list):
global_norm_var = []
if len(sum_square_list_fp16) > 0:
global_norm_var_fp16 = async_add_n(sum_square_list_fp16)
if self._pp_mesh is not None:
# sync pp
global_norm_var_fp16 = (
paddle.distributed.auto_parallel.api.dtensor_from_local(
global_norm_var_fp16._local_value().reshape([-1]),
self._pp_mesh,
[paddle.distributed.Partial()],
)
)
global_norm_var_fp16 = paddle.distributed.reshard(
global_norm_var_fp16,
self._pp_mesh,
[paddle.distributed.Replicate()],
)
global_norm_var.append(global_norm_var_fp16.astype(sum_dtype))
if len(sum_square_list_fp32) > 0:
global_norm_var_fp32 = async_add_n(sum_square_list_fp32)
if self._pp_mesh is not None:
# sync pp
global_norm_var_fp32 = (
paddle.distributed.auto_parallel.api.dtensor_from_local(
global_norm_var_fp32._local_value().reshape([-1]),
self._pp_mesh,
[paddle.distributed.Partial()],
)
)
global_norm_var_fp32 = paddle.distributed.reshard(
global_norm_var_fp32,
self._pp_mesh,
[paddle.distributed.Replicate()],
)
if sum_dtype == 'float32':
global_norm_var.append(global_norm_var_fp32)
else:
global_norm_var.append(global_norm_var_fp32.astype(sum_dtype))
if len(sum_square_list) > 0:
global_norm_var_fp64 = async_add_n(sum_square_list)
if self._pp_mesh is not None:
# sync pp
global_norm_var_fp64 = (
paddle.distributed.auto_parallel.api.dtensor_from_local(
global_norm_var_fp64._local_value().reshape([-1]),
self._pp_mesh,
[paddle.distributed.Partial()],
)
)
global_norm_var_fp64 = paddle.distributed.reshard(
global_norm_var_fp64,
self._pp_mesh,
[paddle.distributed.Replicate()],
)
global_norm_var.append(global_norm_var_fp64)
if self._pp_mesh is not None:
global_norm_var = [t._local_value() for t in global_norm_var]

global_norm_var = async_add_n(global_norm_var)
global_norm_var = paddle.sqrt(global_norm_var)
max_global_norm = paddle.full(
shape=[], dtype=global_norm_var.dtype, fill_value=self.clip_norm
shape=[], dtype=sum_dtype, fill_value=self.clip_norm
)

need_clip = False
Expand Down Expand Up @@ -800,6 +766,10 @@ def async_add_n(var_list):
if clip_var.dtype != g.dtype
else clip_var
)
if clip_input.process_mesh != g.process_mesh:
clip_input = paddle.distributed.reshard(
clip_input, g.process_mesh, clip_input.placements
)
new_grad = paddle.multiply(g, clip_input)
params_and_grads.append((p, new_grad))
else:
Expand Down
13 changes: 11 additions & 2 deletions python/paddle/optimizer/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1193,15 +1193,24 @@ def _create_optimization_pass(
self._set_auxiliary_var('found_inf', False)
if isinstance(parameters_and_grads, list):
for param_and_grad in parameters_and_grads:
if param_and_grad[1] is None:
# Parameters can be uninitialized in pipeline parallel of semi-auto parallel.
# Since gradient clip and parameters update mixed up in one interface, so we
# need to filter again here.
if (
param_and_grad[1] is None
or not param_and_grad[1]._is_initialized()
):
continue
if param_and_grad[0].stop_gradient is False:
self._append_optimize_op(
target_block, param_and_grad
)
else:
for param_and_grad in parameters_and_grads['params']:
if param_and_grad[1] is None:
if (
param_and_grad[1] is None
or not param_and_grad[1]._is_initialized()
):
continue
if param_and_grad[0].stop_gradient is False:
param_grad_dict = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def test_dp_mp_sp_demo_net(self):
for param, param_base in zip(
self.dp_mp_sp_parameters, self.base_parameters
):
if param.grad is not None:
if param.grad._is_initialized():
self.check_tensor_eq(param, param_base)
self.check_tensor_eq(param.grad, param_base.grad)

Expand Down

0 comments on commit d7ef595

Please sign in to comment.