From 3e5fcb202f5b2a73e84651c295040fd5c3163307 Mon Sep 17 00:00:00 2001 From: esythan Date: Wed, 28 Jul 2021 16:29:14 +0800 Subject: [PATCH 1/2] add trainer desc config to distributed strategy --- .../framework/distributed_strategy.proto | 8 +++ .../fleet/base/distributed_strategy.py | 39 +++++++++++ .../distributed/fleet/base/fleet_base.py | 8 +++ .../test_dist_fleet_trainer_desc_config.py | 68 +++++++++++++++++++ .../test_fleet_distributed_strategy.py | 13 ++++ 5 files changed, 136 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 0a94b897b9b1d..6a17ccc54f024 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -145,6 +145,13 @@ message AsyncConfig { optional int32 use_ps_gpu = 12 [ default = 0 ]; } +message TrainerDescConfig { + optional string dump_fields_path = 1; + repeated string dump_fields = 2; + repeated string dump_param = 3; + repeated string stat_var_names = 4; +} + message PipelineConfig { optional int32 micro_batch_size = 1 [ default = 1 ]; optional int32 accumulate_steps = 2 [ default = 1 ]; @@ -205,6 +212,7 @@ message DistributedStrategy { optional ShardingConfig sharding_configs = 111; optional HybridConfig hybrid_configs = 112; optional TensorParallelConfig tensor_parallel_configs = 113; + optional TrainerDescConfig trainer_desc_configs = 114; optional BuildStrategy build_strategy = 201; optional ExecutionStrategy execution_strategy = 202; optional GradientScaleConfig gradient_scale_configs = 203; diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 86882b0be6fc3..051f6b11c2609 100644 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -360,6 +360,45 @@ def a_sync_configs(self, configs): "a_sync_configs") assign_configs_value(self.strategy.a_sync_configs, configs) + @property + def trainer_desc_configs(self): + """ + Set trainer desc configurations. + + **Notes**: + dump_fields_path(str): the path of dump fields + + dump_fields(list(str)): the fields that you want to dump + + dump_param(list(str)): the param that you want to dump + + stat_var_names(list(str)): + + Examples: + + .. code-block:: python + + import paddle.distributed.fleet as fleet + role_maker = fleet.PaddleCloudRoleMaker() + fleet.init(role_maker) + + strategy = fleet.DistributedStrategy() + configs = {"dump_fields_path": "./dump_data", "dump_fields": ["xxx", "yyy"]} + strategy.trainer_desc_configs = configs + + # code block for defining loss and local optimizer + # sgd = fleet.distributed_optimizer(optimizer, strategy) + + """ + return get_msg_dict(self.strategy.trainer_desc_configs) + + @trainer_desc_configs.setter + @is_strict_auto + def trainer_desc_configs(self, configs): + check_configs_key(self.strategy.trainer_desc_configs, configs, + "trainer_desc_configs") + assign_configs_value(self.strategy.trainer_desc_configs, configs) + @property def amp(self): """ diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index 2a9b15c732541..5ac617376a961 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -1460,6 +1460,14 @@ def minimize(self, context["graph_optimize_ops"] = optimize_ops context["graph_optimize_grads"] = params_grads + program = paddle.static.default_main_program() + opt_info = {} + opt_info["mpi_size"] = self.worker_num() + opt_info["mpi_rank"] = self.worker_index() + for k, v in self._user_defined_strategy.trainer_desc_configs.items(): + opt_info[k] = v + program._fleet_opt = opt_info + if self._runtime_handle is None: self._runtime_handle = RuntimeFactory()._create_runtime(context) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py new file mode 100644 index 0000000000000..baffe8595027f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py @@ -0,0 +1,68 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import unittest + +import paddle +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid.transpiler.details.program_utils as pu + +paddle.enable_static() + + +class TestDistStrategyTrainerDescConfig(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" + + def test_trainer_desc_config(self): + os.environ["TRAINING_ROLE"] = "TRAINER" + import paddle.distributed.fleet as fleet + + fleet.init(role_maker.PaddleCloudRoleMaker()) + + x = paddle.fluid.layers.data(name='x', shape=[1], dtype='float32') + y = paddle.fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = paddle.fluid.layers.square_error_cost(input=x, label=y) + avg_cost = paddle.fluid.layers.mean(cost) + + strategy = paddle.distributed.fleet.DistributedStrategy() + config = { + "dump_fields_path": "dump_data", + "dump_fields": ["xxx", "yyy"], + "dump_param": [] + } + strategy.trainer_desc_configs = config + + optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + program = paddle.static.default_main_program() + self.assertEqual(program._fleet_opt["dump_fields_path"], "dump_data") + self.assertEqual(len(program._fleet_opt["dump_fields"]), 2) + self.assertEqual(len(program._fleet_opt["dump_param"]), 0) + self.assertEqual(program._fleet_opt["mpi_size"], + int(os.environ["PADDLE_TRAINERS_NUM"])) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py index 52895217d3f90..11f5293f7cc6e 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py @@ -255,6 +255,19 @@ def test_a_sync_configs(self): strategy.a_sync_configs = configs self.assertEqual(strategy.a_sync_configs["k_steps"], 1000) + def test_trainer_desc_configs(self): + strategy = paddle.distributed.fleet.DistributedStrategy() + configs = { + "dump_fields_path": "dump_data", + "dump_fields": ["xxx", "yyy"], + "dump_param": [] + } + strategy.trainer_desc_configs = configs + self.assertEqual(strategy.trainer_desc_configs["dump_fields_path"], + "dump_data") + self.assertEqual(len(strategy.trainer_desc_configs["dump_fields"]), 2) + self.assertEqual(len(strategy.trainer_desc_configs["dump_param"]), 0) + def test_elastic(self): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.elastic = True From 21990b3853237637a51501833746eeec8f601ddb Mon Sep 17 00:00:00 2001 From: esythan Date: Wed, 28 Jul 2021 18:05:16 +0800 Subject: [PATCH 2/2] code style modified --- .../tests/unittests/test_dist_fleet_trainer_desc_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py index baffe8595027f..5e46945031300 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_trainer_desc_config.py @@ -61,7 +61,7 @@ def test_trainer_desc_config(self): self.assertEqual(len(program._fleet_opt["dump_fields"]), 2) self.assertEqual(len(program._fleet_opt["dump_param"]), 0) self.assertEqual(program._fleet_opt["mpi_size"], - int(os.environ["PADDLE_TRAINERS_NUM"])) + int(os.environ["PADDLE_TRAINERS_NUM"])) if __name__ == "__main__":