Skip to content

Commit

Permalink
[hybrid performance] Grad fuse for gradient merge under pipeline mode (
Browse files Browse the repository at this point in the history
  • Loading branch information
FeixLiu authored Aug 20, 2021
1 parent f6015d0 commit 4d9b2d6
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 11 deletions.
1 change: 1 addition & 0 deletions paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ message DistributedStrategy {
optional int32 fuse_grad_size_in_num = 31 [ default = 8 ];
optional bool calc_comm_same_stream = 32 [ default = false ];
optional bool asp = 33 [ default = false ];
optional bool fuse_grad_merge = 34 [ default = false ];

optional RecomputeConfig recompute_configs = 101;
optional AMPConfig amp_configs = 102;
Expand Down
65 changes: 59 additions & 6 deletions paddle/fluid/operators/coalesce_tensor_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,49 @@
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/device_memory_aligment.h"
#ifdef PADDLE_WITH_ASCEND_CL
#include "paddle/fluid/operators/npu_op_runner.h"
#endif

namespace paddle {
namespace operators {

template <typename DeviceContext>
struct FillConstantVisitor {
FillConstantVisitor(const DeviceContext &dev_ctx,
framework::LoDTensor *tensor, const float value)
: dev_ctx_(dev_ctx), tensor_(tensor), value_(value) {}

template <typename T>
void apply(typename std::enable_if<std::is_same<T, int8_t>::value ||
std::is_same<T, int16_t>::value>::type * =
nullptr) const {
PADDLE_THROW(platform::errors::InvalidArgument(
"Not support data type for set_constant attr"));
}

template <typename T>
void apply(typename std::enable_if<!(std::is_same<T, int8_t>::value ||
std::is_same<T, int16_t>::value)>::type
* = nullptr) const {
#ifdef PADDLE_WITH_ASCEND_CL
if (platform::is_npu_place(dev_ctx_.GetPlace())) {
FillNpuTensorWithConstant<T>(tensor_, static_cast<T>(value_));
} else {
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx_, tensor_, static_cast<T>(value_));
}
#else
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx_, tensor_, static_cast<T>(value_));
#endif
}

const DeviceContext &dev_ctx_;
framework::LoDTensor *tensor_;
float value_;
};

template <typename DeviceContext, typename T>
class CoalesceTensorOpKernel : public framework::OpKernel<T> {
public:
Expand Down Expand Up @@ -70,6 +109,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
auto in_tensors = context.MultiInput<framework::LoDTensor>("Input");
bool use_align = context.Attr<bool>("use_align");
auto align_size = context.Attr<int>("align_size");
auto size_of_dtype = context.Attr<int>("user_defined_size_of_dtype");

if (context.Attr<bool>("check_name")) {
for (size_t i = 0; i < in_var_names.size(); ++i) {
Expand All @@ -94,7 +134,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
size_t numel = 0;
auto dtype = static_cast<framework::proto::VarType::Type>(
context.Attr<int>("dtype"));
size_t size_of_dtype = framework::SizeOfType(dtype);
if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}
GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype,
context.GetPlace(), use_align, align_size);

Expand All @@ -121,10 +163,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
: len;
}
} else if (context.Attr<bool>("set_constant")) {
// TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION.
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx, fused_tensor,
static_cast<T>(context.Attr<float>("constant")));
framework::VisitDataType(
dtype, FillConstantVisitor<DeviceContext>(
dev_ctx, fused_tensor, context.Attr<float>("constant")));
} else if (context.Attr<bool>("persist_output")) {
for (size_t i = 0; i < out_var_names.size(); ++i) {
size_t len = static_cast<size_t>(out_tensors[i]->numel());
Expand Down Expand Up @@ -227,10 +268,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel {
}
auto use_align = ctx->Attrs().Get<bool>("use_align");
auto align_size = ctx->Attrs().Get<int>("align_size");
auto size_of_dtype = ctx->Attrs().Get<int>("user_defined_size_of_dtype");

auto dtype = static_cast<framework::proto::VarType::Type>(
ctx->Attrs().Get<int>("dtype"));
size_t size_of_dtype = framework::SizeOfType(dtype);
if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}

auto alignment = [](size_t size, size_t align_size) {
size_t remaining = size % align_size;
Expand Down Expand Up @@ -308,6 +352,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
.SetDefault(true);
AddAttr<int>("align_size", "The alignment size when use_align is True")
.SetDefault(-1);
AddAttr<int>("user_defined_size_of_dtype",
"The user defined size of dtype. This is used to coalesce "
"grad vars and merged_grad vars at the same time. For some "
"strategy, the dtype of fused_grad_vars and the dtype of "
"fused_grad_merged_vars are not identical, which will cause "
"the shape of these two coalesced vars are different. To "
"make sure the shape of these two vars are identical with "
"each other, this attr is added.")
.SetDefault(-1);
AddComment(R"DOC(
CoalesceTensor Operator.
Expand Down
22 changes: 22 additions & 0 deletions python/paddle/distributed/fleet/base/distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,28 @@ def _calc_comm_same_stream(self, same):
"WARNING: calc_comm_same_stream should have value of boolean type"
)

@property
def fuse_grad_merge(self):
"""
Set whether fuse the grad for gradient merge.
Note: this flag will only effect the gradient merge under pipeline mode
The default value for the fuse_grad_merge is False
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.fuse_param_grad = True
"""
return self.strategy.fuse_grad_merge

@fuse_grad_merge.setter
@is_strict_auto
def fuse_grad_merge(self, fuse_grad_merge):
if isinstance(fuse_grad_merge, bool):
self.strategy.fuse_grad_merge = fuse_grad_merge
else:
print("WARNING: fuse_grad_merge should have value of boolean type")

@property
def fuse_grad_size_in_num(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def remove_param(input_name):
for idx, op in enumerate(block.ops):
if is_optimizer_op(op):
break
# TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast
if not offload and op.type == 'coalesce_tensor':
continue
for input_name in op.desc.input_arg_names():
if input_name not in param_to_idx:
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ def insert_allreduce_ops(block,
if len(allreduce_vars) == 0:
return

if user_defined_strategy and user_defined_strategy.fuse_all_reduce_ops:
if user_defined_strategy and \
user_defined_strategy.fuse_all_reduce_ops and \
not user_defined_strategy.fuse_grad_merge:
# If fuse_grad_merge is enable, the grad vars have already been fused during
# gradient merge pass, therefore, those vars are not need to be fused here
insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars,
op_role, use_calc_stream,
user_defined_strategy.fuse_grad_size_in_MB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ def _insert_allreduce_for_pp(self):
main_block._remove_op(idx)

accumulated_grad_names = self._pp_optimizer._accumulate_gradients(
main_block, fp16_allreduce=fp16_allreduce)
main_block,
fp16_allreduce=fp16_allreduce,
user_defined_strategy=strategy)

len_of_ops = len(main_block.ops)
first_optimize_op_index = get_first_optimize_op_idx(main_block)
Expand Down
Loading

0 comments on commit 4d9b2d6

Please sign in to comment.