Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hybrid parallel] pipeline support adamw and LRScheduler #34402

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,16 @@ def _run_pipeline(self,
print_period, fetch_handler,
use_program_cache)

from paddle.optimizer.lr import LRScheduler
if hasattr(program, 'lr_sheduler'):
lr_sheduler = program.lr_sheduler
assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
lr_value = lr_sheduler()
lr_var = program.global_block().vars[lr_sheduler._var_name]
data = np.array([lr_value]).astype(convert_dtype(lr_var.dtype))
tensor = core.get_variable_tensor(scope, lr_sheduler._var_name)
tensor.set(data, self.place)

self._default_executor.run_from_dataset(trainer_instance)

if not use_program_cache:
Expand Down
8 changes: 8 additions & 0 deletions python/paddle/fluid/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4634,6 +4634,9 @@ def _add_op_device_attr_for_op(self, op, idx, block):
op.type == 'elementwise_div'):
device = f"{self._device}:all"
op._set_attr(self._op_device_key, device)
elif self._is_weight_decay_op(op) and op.type == 'scale':
# set AdamW decay_coeff to device:all
op._set_attr(self._op_device_key, f"{self._device}:all")
elif op.type == "alloc_float_status":
op._set_attr(self._op_device_key, f"{self._device}:all")
else:
Expand Down Expand Up @@ -5267,6 +5270,11 @@ def _is_regularization_op(self, op):
return op.desc.has_attr("op_namescope") \
and op.desc.attr("op_namescope").startswith("/regularization")

def _is_weight_decay_op(self, op):
# in AdamW namescope is /optimizer_*/weight decay/
return op.desc.has_attr("op_namescope") \
and 'weight decay' in op.desc.attr("op_namescope")

def _get_input_output_info(self, block):
'''
Get info of op input and output.
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/fluid/tests/unittests/pipeline_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ def get_model(self, batch_size=2, use_dgc=False, dist_strategy=None):
steps_per_pass = 10
bd = [steps_per_pass * p for p in passes]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
opt = fluid.optimizer.Momentum(
lr_val = paddle.optimizer.lr.PiecewiseDecay(boundaries=bd, values=lr)

opt = paddle.optimizer.AdamW(
learning_rate=lr_val,
momentum=0.9,
grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0))

acc_steps = 2 # accumulated steps for pipeline
Expand Down
21 changes: 20 additions & 1 deletion python/paddle/fluid/tests/unittests/test_dist_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ def get_transpiler(trainer_id,
current_endpoint=current_endpoint)
return t

@staticmethod
def get_lr_scheduler(program):
lr_sheduler = None
if hasattr(program, 'lr_sheduler'):
from paddle.optimizer.lr import LRScheduler
lr_sheduler = program.lr_sheduler
assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
return lr_sheduler

def run_pserver(self, args):
self.lr = args.lr
self.get_model(batch_size=args.batch_size)
Expand Down Expand Up @@ -139,11 +148,17 @@ def run_pipeline_trainer(self, args):
data_loader.start()
print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = []

main_program = fluid.default_main_program()
lr_sheduler = self.get_lr_scheduler(main_program)
for i in six.moves.xrange(RUN_STEP):
loss = exe.run(fluid.default_main_program(), fetch_list=[avg_cost])
loss = exe.run(main_program, fetch_list=[avg_cost])
loss = loss[0] if loss else None
out_losses.append(loss)
print_to_err(type(self).__name__, "run step %d finished" % i)
if lr_sheduler is not None:
lr_sheduler.step()

data_loader.reset()
print_to_err(type(self).__name__, "trainer run finished")

Expand Down Expand Up @@ -494,6 +509,7 @@ def get_data():
else:
return origin_batch

lr_scheduler = self.get_lr_scheduler(trainer_prog)
print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = []
for i in six.moves.xrange(RUN_STEP):
Expand All @@ -502,6 +518,9 @@ def get_data():
feed=feeder.feed(get_data()))
out_losses.append(loss[0])
print_to_err(type(self).__name__, "run step %d finished" % i)
if lr_scheduler is not None:
lr_scheduler.step()

print_to_err(type(self).__name__, "trainer run finished")

print_to_out(out_losses)
Expand Down
5 changes: 4 additions & 1 deletion python/paddle/optimizer/adamw.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(self,
self._apply_decay_param_fun = apply_decay_param_fun
self._coeff = coeff
self._lr_to_coeff = dict()

super(AdamW, self).__init__(
learning_rate=learning_rate,
parameters=parameters,
Expand Down Expand Up @@ -211,7 +212,9 @@ def _append_decoupled_weight_decay(self, block, param_and_grad):
# we do this in _create_optimization_pass
decay_coeff = self._lr_to_coeff.get(learning_rate, None)
if decay_coeff is None:
decay_coeff = 1.0 - learning_rate * self._coeff
# NOTE(wangxi): for pipeline to set device:all
with paddle.static.device_guard(None):
decay_coeff = 1.0 - learning_rate * self._coeff
self._lr_to_coeff[learning_rate] = decay_coeff

find_master = (self._multi_precision and
Expand Down