diff --git a/paddle/fluid/imperative/reducer.cc b/paddle/fluid/imperative/reducer.cc index 0f6676ed48f34..4166edf32573c 100644 --- a/paddle/fluid/imperative/reducer.cc +++ b/paddle/fluid/imperative/reducer.cc @@ -527,6 +527,7 @@ void Reducer::TraverseBackwardGraph( void Reducer::PrepareForBackward( const std::vector> &outputs) { VLOG(3) << "after forward, then reset count for backward."; + grad_need_hooks_ = true; next_group_ = 0; std::for_each(groups_.begin(), groups_.end(), [](Group &group) { group.pending_ = group.variable_indices_.size(); @@ -599,6 +600,11 @@ void Reducer::AddDistHook(size_t var_index) { "than %d, but it is %d", variable_locators_.size(), var_index)); + // gradient synchronization is not required when grad_need_hooks_ is false. + if (!grad_need_hooks_) { + return; + } + VLOG(3) << "Var[" << var_index << "] [" << vars_[var_index]->GradVarBase()->Name() << "] arrived and triggered disthook"; @@ -692,8 +698,8 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) { auto var_base = vars_[var_index]->GradVarBase(); auto tensor = var_base->MutableVar()->GetMutable(); - TensorCopy(*tensor, place_, *dev_ctx, &group_tensor); - group_tensor.Resize({static_cast(length)}); + group_tensor.ShareDataWith(*tensor).Resize( + {static_cast(length)}); } else { group_tensor.Resize({static_cast(length)}); operators::math::set_constant(*dev_ctx, &group_tensor, 0.0); @@ -907,6 +913,10 @@ void Reducer::ProcessUnusedDenseVars() { // 3. create grad var base or get grad var base auto grad_var_base_tmp = dest_var_base->MutableGradVarBase(); + // NOTE(haohongxiang): Calling SetIsEmpty here is to make sure that + // gradient accumulation can continue normally after clear_gradients() + // especiall in cases including complex control flow. + grad_var_base_tmp->SharedVar()->SetIsEmpty(false); // 4. set grad tensor auto *dest_grad_tensor = @@ -942,6 +952,7 @@ bool Reducer::HasGrad(size_t var_index) { void Reducer::FinalizeBackward() { groups_need_finalize_ = false; + grad_need_hooks_ = false; #ifdef PADDLE_WITH_XPU_BKCL { std::unique_lock lock(mutex_); diff --git a/paddle/fluid/imperative/reducer.h b/paddle/fluid/imperative/reducer.h index 8392ab2c704d5..3cc40f7b1306a 100644 --- a/paddle/fluid/imperative/reducer.h +++ b/paddle/fluid/imperative/reducer.h @@ -209,6 +209,12 @@ class Reducer { std::condition_variable cv_; #endif + // grad_need_hooks_ is used to mark whether gradient synchronization is + // required across process. The default value is false. When backward() + // is called, grad_need_hooks_ will be assigned to true during preparation + // of backward and revert to false while finalizing backward. + bool grad_need_hooks_{false}; + // it just for checking hook, each parameter can only trigger one hook std::vector vars_marked_ready_; diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index a905e1dba8467..15f4eece4487c 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -19,6 +19,7 @@ from collections import OrderedDict import itertools import warnings +from contextlib import contextmanager import paddle from paddle.fluid import core @@ -483,6 +484,7 @@ def __init__(self, self._layers = layers self.find_unused_parameters = find_unused_parameters + self.grad_need_sync = True # NOTE(chenweihang): The ParallelStrategy here is not strictly a strategy. # It just stores some environment variables, which can be constructed by @@ -576,9 +578,55 @@ def _find_varbase(self, obj): return itertools.chain(*map(self._find_varbase, obj.values())) return [] + @contextmanager + def no_sync(self): + """ + A context manager to stop gradient synchronization. Within no_sync(), + gradients of parameters will only be accumulated on model and not + synchronized util the first forward-backward out of this context. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.nn as nn + import paddle.distributed as dist + + class SimpleNet(nn.Layer): + def __init__(self): + super(SimpleNet, self).__init__() + self._linear = nn.Linear(10, 1) + + def forward(self, x): + return self._linear(x) + + dist.init_parallel_env() + model = SimpleNet() + dp_model = paddle.DataParallel(model) + + inputs_1 = paddle.randn([10, 10], 'float32') + inputs_2 = paddle.ones([10, 10], 'float32') + + with dp_model.no_sync(): + # gradients will not be synchronized + dp_model(inputs_1).backward() + + # synchronization happens here + dp_model(inputs_2).backward() + + """ + tmp_grad_need_sync = self.grad_need_sync + self.grad_need_sync = False + try: + yield + finally: + self.grad_need_sync = tmp_grad_need_sync + def forward(self, *inputs, **kwargs): outputs = self._layers(*inputs, **kwargs) - if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad: + if self._strategy.nranks > 1 and framework._dygraph_tracer( + )._has_grad and self.grad_need_sync: self._reducer.prepare_for_backward( list(self._find_varbase(outputs))) return outputs diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 007221ca4f9ca..eba1a970d0cfe 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -23,6 +23,8 @@ list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) list(APPEND DIST_TEST_OPS test_gen_nccl_id_op) list(APPEND DIST_TEST_OPS test_parallel_dygraph_unused_variables) list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow) +list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync) +list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync_gradient_check) list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel) list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_parallel) list(APPEND DIST_TEST_OPS test_parallel_dygraph_tensor_parallel) @@ -186,6 +188,8 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM)) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_transformer) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_control_flow) + list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync) + list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync_gradient_check) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_parallel) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_tensor_parallel) @@ -902,6 +906,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL) set_tests_properties(test_parallel_dygraph_dataparallel PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 120) + set_tests_properties(test_parallel_dygraph_no_sync PROPERTIES TIMEOUT 120) + set_tests_properties(test_parallel_dygraph_no_sync_gradient_check PROPERTIES TIMEOUT 30) set_tests_properties(test_parallel_dygraph_pipeline_parallel PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT 200) set_tests_properties(test_parallel_dygraph_sharding_parallel PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync.py new file mode 100644 index 0000000000000..0e7e1a32cfa05 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync.py @@ -0,0 +1,175 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import contextlib +import unittest +import numpy as np +import six +import pickle +import random + +import paddle +import paddle.fluid as fluid +import paddle.distributed as dist +import paddle.fluid.dygraph as dygraph +from paddle.fluid import core +from paddle.fluid.dygraph.nn import Linear +from test_dist_base import print_to_err, print_to_out, runtime_main, TestParallelDyGraphRunnerBase + +seed = 90 +RUN_STEP = 20 +batch_size = 4 +batch_num = 1000 + + +class SimpleNet(fluid.Layer): + def __init__(self): + super(SimpleNet, self).__init__() + self.net_a = Linear(input_dim=10, output_dim=20) + self.net_b = Linear(input_dim=20, output_dim=5) + self.net_c = Linear(input_dim=5, output_dim=10) + + def forward(self, x): + x = self.net_a(x) + x = self.net_b(x) + x = self.net_c(x) + return x + + +class TestNoSync(TestParallelDyGraphRunnerBase): + def get_model(self): + model = SimpleNet() + train_reader = paddle.batch( + fake_sample_reader(), batch_size=batch_size, drop_last=True) + optimizer = paddle.optimizer.SGD(learning_rate=0.001, + parameters=model.parameters()) + return model, train_reader, optimizer + + def run_one_loop(self, model, optimizer, batch): + x_data = np.array([x for x in batch]) + x_data = x_data.reshape((-1, 10)) + x = paddle.to_tensor(x_data) + out = model(x) + loss = out.sum() / len(batch) + return loss + + def run_trainer(self, args): + if fluid.core.is_compiled_with_cuda(): + device_id = int(os.getenv("FLAGS_selected_gpus", "0")) + place = fluid.CUDAPlace(device_id) + else: + assert ("Only support CUDAPlace for now.") + + with fluid.dygraph.guard(place): + fluid.default_startup_program().random_seed = seed + fluid.default_main_program().random_seed = seed + np.random.seed(seed) + random.seed(seed) + model, train_reader, opt = self.get_model() + + if args.update_method == "nccl2": + dist.init_parallel_env() + print_to_err( + type(self).__name__, + "begin to prepare context in dygraph with nccl2") + if not args.find_unused_parameters: + model = paddle.DataParallel( + model, find_unused_parameters=False) + else: + model = paddle.DataParallel( + model, find_unused_parameters=True) + print_to_err(type(self).__name__, "model built in dygraph") + out_losses = [] + print_to_err(type(self).__name__, "begin to run dygraph training") + for step_id, data in enumerate(train_reader()): + data = self._get_data(data, args) + if step_id == RUN_STEP: + break + if step_id % 3 != 0: + if args.update_method == "nccl2": + with model.no_sync(): + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + opt.minimize(loss) + print_to_err( + type(self).__name__, + "loss at step %d: %f" % (step_id, loss.numpy())) + out_losses.append(loss.numpy()) + + if not args.accumulate_gradient: + model.clear_gradients() + print_to_out(out_losses) + + def run_trainer_with_spawn(self, args): + fluid.default_startup_program().random_seed = seed + fluid.default_main_program().random_seed = seed + np.random.seed(seed) + random.seed(seed) + args.trainer_id = dist.get_rank() + + if args.update_method == "nccl2": + dist.init_parallel_env() + model, train_reader, opt = self.get_model() + if args.update_method == "nccl2": + if args.find_unused_parameters: + model = paddle.DataParallel(model, find_unused_parameters=True) + else: + model = paddle.DataParallel(model, find_unused_parameters=False) + + out_losses = [] + for step_id, data in enumerate(train_reader()): + data = self._get_data(data, args) + if step_id == RUN_STEP: + break + if step_id % 3 != 0: + if args.update_method == "nccl2": + with model.no_sync(): + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + opt.minimize(loss) + print_to_err( + type(self).__name__, + "loss at step %d: %f" % (step_id, loss.numpy())) + out_losses.append(loss.numpy()) + model.clear_gradients() + print_to_out(out_losses) + return out_losses + + +def fake_sample_reader(): + def __reader__(): + for i in range(batch_num): + x_data = np.random.random_sample((10, )).astype('float32') + yield x_data + + return __reader__ + + +if __name__ == "__main__": + runtime_main(TestNoSync) diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_control_flow.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_control_flow.py new file mode 100644 index 0000000000000..ebc0cd7d6f3de --- /dev/null +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_control_flow.py @@ -0,0 +1,176 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import contextlib +import unittest +import numpy as np +import six +import pickle +import random + +import paddle +import paddle.fluid as fluid +import paddle.distributed as dist +import paddle.fluid.dygraph as dygraph +from paddle.fluid import core +from paddle.fluid.dygraph.nn import Linear +from test_dist_base import print_to_err, print_to_out, runtime_main, TestParallelDyGraphRunnerBase + +seed = 90 +RUN_STEP = 20 +batch_size = 4 +batch_num = 1000 + + +class SimpleNetControlFlow(fluid.Layer): + def __init__(self): + super(SimpleNetControlFlow, self).__init__() + self.net_a = Linear(input_dim=10, output_dim=20) + self.net_b = Linear(input_dim=20, output_dim=5) + self.net_c = Linear(input_dim=5, output_dim=10) + self.step = 0 + + def forward(self, x): + self.step = self.step + 1 + x = self.net_a(x) + if self.step > 10: + x.stop_gradient = True + x = self.net_b(x) + x = self.net_c(x) + return x + + +class TestNoSyncControlFlow(TestParallelDyGraphRunnerBase): + def get_model(self): + model = SimpleNetControlFlow() + train_reader = paddle.batch( + fake_sample_reader(), batch_size=batch_size, drop_last=True) + optimizer = paddle.optimizer.SGD(learning_rate=0.001, + parameters=model.parameters()) + return model, train_reader, optimizer + + def run_one_loop(self, model, optimizer, batch): + x_data = np.array([x for x in batch]) + x_data = x_data.reshape((-1, 10)) + x = paddle.to_tensor(x_data) + out = model(x) + loss = out.sum() / len(batch) + return loss + + def run_trainer(self, args): + if fluid.core.is_compiled_with_cuda(): + device_id = int(os.getenv("FLAGS_selected_gpus", "0")) + place = fluid.CUDAPlace(device_id) + else: + assert ("Only support CUDAPlace for now.") + + with fluid.dygraph.guard(place): + fluid.default_startup_program().random_seed = seed + fluid.default_main_program().random_seed = seed + np.random.seed(seed) + random.seed(seed) + model, train_reader, opt = self.get_model() + + if args.update_method == "nccl2": + dist.init_parallel_env() + print_to_err( + type(self).__name__, + "begin to prepare context in dygraph with nccl2") + if not args.find_unused_parameters: + model = paddle.DataParallel( + model, find_unused_parameters=False) + else: + model = paddle.DataParallel( + model, find_unused_parameters=True) + print_to_err(type(self).__name__, "model built in dygraph") + out_losses = [] + print_to_err(type(self).__name__, "begin to run dygraph training") + for step_id, data in enumerate(train_reader()): + data = self._get_data(data, args) + if step_id == RUN_STEP: + break + if step_id % 3 != 0: + if args.update_method == "nccl2": + with model.no_sync(): + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + opt.minimize(loss) + print_to_err( + type(self).__name__, + "loss at step %d: %f" % (step_id, loss.numpy())) + out_losses.append(loss.numpy()) + + if not args.accumulate_gradient: + model.clear_gradients() + print_to_out(out_losses) + + def run_trainer_with_spawn(self, args): + fluid.default_startup_program().random_seed = seed + fluid.default_main_program().random_seed = seed + np.random.seed(seed) + random.seed(seed) + args.trainer_id = dist.get_rank() + + if args.update_method == "nccl2": + dist.init_parallel_env() + model, train_reader, opt = self.get_model() + if args.update_method == "nccl2": + model = paddle.DataParallel(model, find_unused_parameters=True) + + out_losses = [] + for step_id, data in enumerate(train_reader()): + data = self._get_data(data, args) + if step_id == RUN_STEP: + break + if step_id % 3 != 0: + if args.update_method == "nccl2": + with model.no_sync(): + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + opt.minimize(loss) + print_to_err( + type(self).__name__, + "loss at step %d: %f" % (step_id, loss.numpy())) + out_losses.append(loss.numpy()) + model.clear_gradients() + print_to_out(out_losses) + return out_losses + + +def fake_sample_reader(): + def __reader__(): + for i in range(batch_num): + x_data = np.random.random_sample((10, )).astype('float32') + yield x_data + + return __reader__ + + +if __name__ == "__main__": + runtime_main(TestNoSyncControlFlow) diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_gradient_check.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_gradient_check.py new file mode 100644 index 0000000000000..642ea14d8a87d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_gradient_check.py @@ -0,0 +1,138 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import unittest + +import paddle +import numpy as np +import paddle.distributed as dist +import paddle.fluid as fluid +from paddle.fluid.dygraph.nn import Linear + +paddle.seed(1024) +np.random.seed(2021) + +batch = 1 +in_dim = 10 +out_dim = 20 + + +class SimpleNet(fluid.Layer): + def __init__(self, train_id): + super(SimpleNet, self).__init__() + self.w1 = self.create_parameter( + shape=[in_dim, out_dim], dtype="float32") + self.w2 = self.create_parameter( + shape=[in_dim, out_dim], dtype="float32") + self.share_net = Linear(out_dim, 1) + + self.unused_param = self.create_parameter( + shape=[out_dim, in_dim], dtype="float32") + + # just for test sync_params_buffers + self.register_buffer("queue", paddle.randn([10, 5])) + self.queue = paddle.nn.functional.normalize(self.queue, axis=0) + self.register_buffer("queue_ptr", paddle.zeros([1], 'int64')) + + self.trainer_id = train_id + + def forward(self, x): + is_use = (paddle.equal_all( + x, paddle.ones(shape=(batch, in_dim))).numpy()[0] and + self.trainer_id == 1) + + if is_use: + tmp = paddle.matmul(x, self.w1) + else: + tmp = paddle.matmul(x, self.w2) + + return self.share_net(tmp) + + +class TestDistTraning(unittest.TestCase): + def test_multiple_gpus(self): + self.trainer_id = dist.get_rank() + dist.init_parallel_env() + + model_a = SimpleNet(self.trainer_id) + model_b = SimpleNet(self.trainer_id) + + state_dict = model_a.state_dict() + model_b.set_state_dict(state_dict) + + model_a = paddle.DataParallel(model_a, find_unused_parameters=True) + model_b = paddle.DataParallel(model_b, find_unused_parameters=True) + + ones_input = paddle.ones(shape=(batch, in_dim)) + ones_input.stop_gradient = True + + for step_id in range(1, 31): + random_input = paddle.rand(shape=(batch, in_dim)) + random_input.stop_gradient = True + + if step_id % 5 != 0: + with model_a.no_sync(): + self.dp_layer(step_id, model_a, model_b, random_input, + ones_input) + else: + self.dp_layer(step_id, model_a, model_b, random_input, + ones_input) + + self.check_gradient(model_a.parameters()) + self.check_gradient(model_b.parameters()) + + self.check_acc(model_a._layers.w1.grad, model_b._layers.w1.grad) + self.check_acc(model_a._layers.w2.grad, model_b._layers.w2.grad) + + model_a.clear_gradients() + model_b.clear_gradients() + + def dp_layer(self, step_id, model_a, model_b, random_input, ones_input): + if step_id % 2 == 0: + out_a = model_a(random_input) + out_b = model_b(random_input) + else: + out_a = model_a(ones_input) + out_b = model_b(ones_input) + out_a.sum().backward() + out_b.sum().backward() + + def check_acc(self, grad, acc_grad): + grad = grad.numpy() if grad is not None else None + acc_grad = acc_grad.numpy() if acc_grad is not None else None + return np.testing.assert_allclose(grad, acc_grad, rtol=1e-6) + + def print_trainer_0(self, *args): + if self.trainer_id == 0: + print(*args) + + def broadcast_param(self, param, root): + paddle.distributed.broadcast(param, root) + return param + + def check_gradient(self, params): + other_param = [] + for param in params: + if param.trainable and (param._grad_ivar() is not None): + grad = param._grad_ivar() + other_grad = self.broadcast_param(grad.clone(), root=1) + if self.trainer_id == 0: + np.testing.assert_allclose(other_grad.numpy(), grad.numpy()) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_unused_params.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_unused_params.py new file mode 100644 index 0000000000000..a5ab327b7788a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_no_sync_unused_params.py @@ -0,0 +1,179 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import contextlib +import unittest +import numpy as np +import six +import pickle +import random + +import paddle +import paddle.fluid as fluid +import paddle.distributed as dist +import paddle.fluid.dygraph as dygraph +from paddle.fluid import core +from paddle.fluid.dygraph.nn import Linear +from test_dist_base import print_to_err, print_to_out, runtime_main, TestParallelDyGraphRunnerBase + +seed = 90 +RUN_STEP = 20 +batch_size = 4 +batch_num = 1000 + + +class SimpleNetUnusedParam(fluid.Layer): + def __init__(self): + super(SimpleNetUnusedParam, self).__init__() + self.net_a = Linear(input_dim=10, output_dim=20) + self.net_b = Linear(input_dim=20, output_dim=5) + self.net_c = Linear(input_dim=5, output_dim=10) + + self.net_d = Linear(input_dim=20, output_dim=10) + + def forward(self, x): + x = self.net_a(x) + x.stop_gradient = True + x = self.net_b(x) + x = self.net_c(x) + return x + + +class TestNoSyncUnusedParam(TestParallelDyGraphRunnerBase): + def get_model(self): + model = SimpleNetUnusedParam() + train_reader = paddle.batch( + fake_sample_reader(), batch_size=batch_size, drop_last=True) + optimizer = paddle.optimizer.SGD(learning_rate=0.001, + parameters=model.parameters()) + return model, train_reader, optimizer + + def run_one_loop(self, model, optimizer, batch): + x_data = np.array([x for x in batch]) + x_data = x_data.reshape((-1, 10)) + x = paddle.to_tensor(x_data) + out = model(x) + loss = out.sum() / len(batch) + return loss + + def run_trainer(self, args): + if fluid.core.is_compiled_with_cuda(): + device_id = int(os.getenv("FLAGS_selected_gpus", "0")) + place = fluid.CUDAPlace(device_id) + else: + assert ("Only support CUDAPlace for now.") + + with fluid.dygraph.guard(place): + fluid.default_startup_program().random_seed = seed + fluid.default_main_program().random_seed = seed + np.random.seed(seed) + random.seed(seed) + model, train_reader, opt = self.get_model() + + if args.update_method == "nccl2": + dist.init_parallel_env() + print_to_err( + type(self).__name__, + "begin to prepare context in dygraph with nccl2") + if not args.find_unused_parameters: + model = paddle.DataParallel( + model, find_unused_parameters=False) + else: + model = paddle.DataParallel( + model, find_unused_parameters=True) + print_to_err(type(self).__name__, "model built in dygraph") + out_losses = [] + print_to_err(type(self).__name__, "begin to run dygraph training") + for step_id, data in enumerate(train_reader()): + data = self._get_data(data, args) + if step_id == RUN_STEP: + break + if step_id % 3 != 0: + if args.update_method == "nccl2": + with model.no_sync(): + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + opt.minimize(loss) + print_to_err( + type(self).__name__, + "loss at step %d: %f" % (step_id, loss.numpy())) + out_losses.append(loss.numpy()) + + if not args.accumulate_gradient: + model.clear_gradients() + print_to_out(out_losses) + + def run_trainer_with_spawn(self, args): + paddle.disable_static() + fluid.default_startup_program().random_seed = seed + fluid.default_main_program().random_seed = seed + np.random.seed(seed) + random.seed(seed) + args.trainer_id = dist.get_rank() + + if args.update_method == "nccl2": + dist.init_parallel_env() + model, train_reader, opt = self.get_model() + if args.update_method == "nccl2": + if args.find_unused_parameters: + model = paddle.DataParallel(model, find_unused_parameters=True) + else: + model = paddle.DataParallel(model, find_unused_parameters=False) + + out_losses = [] + for step_id, data in enumerate(train_reader()): + data = self._get_data(data, args) + if step_id == RUN_STEP: + break + if step_id % 3 != 0: + if args.update_method == "nccl2": + with model.no_sync(): + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + else: + loss = self.run_one_loop(model, opt, data) + loss.backward() + opt.minimize(loss) + print_to_err( + type(self).__name__, + "loss at step %d: %f" % (step_id, loss.numpy())) + out_losses.append(loss.numpy()) + model.clear_gradients() + print_to_out(out_losses) + return out_losses + + +def fake_sample_reader(): + def __reader__(): + for i in range(batch_num): + x_data = np.random.random_sample((10, )).astype('float32') + yield x_data + + return __reader__ + + +if __name__ == "__main__": + runtime_main(TestNoSyncUnusedParam) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_no_sync.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_no_sync.py new file mode 100644 index 0000000000000..a1a8ae52eb787 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_no_sync.py @@ -0,0 +1,100 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import sys +import unittest + +import paddle.fluid as fluid +from test_dist_base import TestDistBase +from spawn_runner_base import TestDistSpawnRunner +from parallel_dygraph_no_sync import TestNoSync +from parallel_dygraph_no_sync_unused_params import TestNoSyncUnusedParam +from parallel_dygraph_no_sync_control_flow import TestNoSyncControlFlow + +flag_name = os.path.splitext(__file__)[0] + + +class TestParallelDygraphNoSync(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._nccl2_mode = True + self._dygraph = True + self._find_unused_parameters = False + + def test_no_sync(self): + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "parallel_dygraph_no_sync.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestParallelDygraphNoSyncUnusedParam(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._nccl2_mode = True + self._dygraph = True + self._find_unused_parameters = True + + def test_no_sync_ununsed_param(self): + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "parallel_dygraph_no_sync_unused_params.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestParallelDygraphNoSyncControlFlow(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._nccl2_mode = True + self._dygraph = True + self._find_unused_parameters = True + + def test_no_sync_control_flow(self): + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "parallel_dygraph_no_sync_control_flow.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestParallelDygraphNoSyncSpawn(TestDistSpawnRunner): + def test_no_sync_with_spawn(self): + if fluid.core.is_compiled_with_cuda() and sys.version_info >= (3, 4): + self.check_dist_result_with_spawn(test_class=TestNoSync, delta=1e-5) + + +class TestParallelDygraphNoSyncUnusedParamSpawn(TestDistSpawnRunner): + def test_no_sync_with_spawn(self): + if fluid.core.is_compiled_with_cuda() and sys.version_info >= (3, 4): + self.check_dist_result_with_spawn( + test_class=TestNoSyncUnusedParam, delta=1e-5) + + +class TestParallelDygraphNoSyncControlFlowSpawn(TestDistSpawnRunner): + def test_no_sync_with_spawn(self): + if fluid.core.is_compiled_with_cuda() and sys.version_info >= (3, 4): + self.check_dist_result_with_spawn( + test_class=TestNoSyncControlFlow, delta=1e-5) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_no_sync_gradient_check.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_no_sync_gradient_check.py new file mode 100644 index 0000000000000..f3fc13f3eea1b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_no_sync_gradient_check.py @@ -0,0 +1,29 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import paddle.fluid as fluid + +from test_parallel_dygraph_dataparallel import TestMultipleGpus + + +class TestModelParallelLayer(TestMultipleGpus): + def test_parallel_dygraph_dataparallel_no_sync(self): + self.run_mnist_2gpu('parallel_dygraph_no_sync_gradient_check.py') + + +if __name__ == "__main__": + unittest.main()